RAGFlow-05-GraphRAG模块

模块概览

1.1 职责与定位

GraphRAG 模块是 RAGFlow 的图谱增强检索引擎,通过构建知识图谱增强 RAG 的语义理解能力。主要职责包括:

  1. 实体识别与抽取:从文档中识别实体(人名、地名、组织、概念)
  2. 关系抽取:识别实体间的关系(同义、上下位、因果、时序)
  3. 实体消歧(Entity Resolution):合并同一实体的不同表述
  4. 图谱构建:将实体与关系存储为图结构(NetworkX)
  5. 图谱检索:基于图谱结构扩展查询,提升召回
  6. 社区检测:Leiden 算法划分实体社区,生成层级摘要

1.2 两种模式

Light 模式(轻量级):

  • 基于关键词与词向量的简单图谱
  • 不使用 LLM,成本低、速度快
  • 适合场景:小规模文档、预算有限

General 模式(完整版):

  • 使用 LLM 进行实体识别、关系抽取、实体消歧
  • 支持社区检测与层级摘要
  • 适合场景:大规模文档、高精度要求

1. 整体服务架构图

1.1 模块层级架构

flowchart TB
    subgraph "API层"
        API["/api/v1/kb/run_graphrag<br/>kb_app.py::run_graphrag()"]
    end

    subgraph "服务层"
        KBSvc[KnowledgebaseService<br/>管理知识库]
        TaskSvc[TaskService<br/>管理任务]
        DocSvc[DocumentService<br/>管理文档]
    end

    subgraph "任务队列层"
        RedisQueue[Redis Stream<br/>ragflow_svr队列]
        TaskExec[TaskExecutor<br/>task_executor.py]
    end

    subgraph "GraphRAG核心层"
        Index[graphrag.general.index<br/>run_graphrag_for_kb()]
        Extractor[GraphExtractor<br/>实体关系抽取]
        Resolution[EntityResolution<br/>实体消歧]
        Community[CommunityReportsExtractor<br/>社区检测与摘要]
        Search[KGSearch<br/>图谱检索]
    end

    subgraph "存储层"
        MySQL[(MySQL<br/>任务/文档元数据)]
        ES[(Elasticsearch<br/>图谱数据/Chunks)]
        Redis[(Redis<br/>分布式锁/缓存)]
    end

    subgraph "外部服务"
        LLM[LLM Service<br/>实体识别/关系抽取/摘要]
        Emb[Embedding Service<br/>向量化/相似度计算]
    end

    API --> KBSvc
    API --> TaskSvc
    API --> DocSvc
    TaskSvc --> RedisQueue
    RedisQueue --> TaskExec
    TaskExec --> Index
    
    Index --> Extractor
    Index --> Resolution
    Index --> Community
    
    Extractor --> LLM
    Extractor --> Emb
    Resolution --> LLM
    Resolution --> Emb
    Community --> LLM
    
    Index --> ES
    Index --> Redis
    Search --> ES
    
    KBSvc --> MySQL
    TaskSvc --> MySQL
    DocSvc --> MySQL

架构说明

  1. API层:接收用户请求,校验权限,创建任务
  2. 服务层:管理知识库、任务、文档的业务逻辑
  3. 任务队列层:异步任务调度,解耦请求与执行
  4. GraphRAG核心层:图谱构建的核心算法实现
  5. 存储层:持久化任务状态、图谱数据、向量索引
  6. 外部服务:LLM和Embedding模型调用

1.2 数据流架构图

flowchart LR
    subgraph "输入"
        Docs[知识库文档]
    end

    subgraph "分块处理"
        Chunks[Chunks<br/>content_with_weight]
    end

    subgraph "模式选择"
        LightMode[Light模式<br/>关键词+共现]
        GeneralMode[General模式<br/>LLM驱动]
    end

    subgraph "子图生成"
        SubGraph1[Doc1 SubGraph]
        SubGraph2[Doc2 SubGraph]
        SubGraphN[DocN SubGraph]
    end

    subgraph "图谱合并"
        MergeGraph[全局图谱<br/>NetworkX Graph]
        PageRank[PageRank计算]
    end

    subgraph "后处理"
        EntityRes[实体消歧<br/>合并同义实体]
        CommDetect[社区检测<br/>Leiden算法]
        CommReport[社区摘要<br/>LLM生成]
    end

    subgraph "存储"
        ESEntity[(ES: entity节点)]
        ESRelation[(ES: relation边)]
        ESCommunity[(ES: community_report)]
    end

    subgraph "检索"
        Query[用户查询]
        KGRetrieval[图谱检索<br/>实体+关系+社区]
        EnhancedResult[增强结果]
    end

    Docs --> Chunks
    Chunks --> LightMode
    Chunks --> GeneralMode
    
    LightMode --> SubGraph1
    GeneralMode --> SubGraph1
    GeneralMode --> SubGraph2
    GeneralMode --> SubGraphN
    
    SubGraph1 --> MergeGraph
    SubGraph2 --> MergeGraph
    SubGraphN --> MergeGraph
    
    MergeGraph --> PageRank
    PageRank --> EntityRes
    EntityRes --> CommDetect
    CommDetect --> CommReport
    
    EntityRes --> ESEntity
    EntityRes --> ESRelation
    CommReport --> ESCommunity
    
    Query --> KGRetrieval
    ESEntity --> KGRetrieval
    ESRelation --> KGRetrieval
    ESCommunity --> KGRetrieval
    KGRetrieval --> EnhancedResult

数据流说明

  1. 输入阶段:文档分块为Chunks
  2. 模式选择:根据配置选择Light或General模式
  3. 并行处理:每个文档并行生成子图
  4. 图谱合并:子图合并为全局图谱并计算PageRank
  5. 后处理:实体消歧、社区检测、生成摘要
  6. 存储索引:图谱数据存入Elasticsearch
  7. 检索增强:查询时从图谱中召回相关内容

2. 完整调用链路与时序图

2.1 GraphRAG 构建完整时序图(自上而下)

sequenceDiagram
    autonumber
    participant User as 用户/前端
    participant API as kb_app.py<br/>run_graphrag()
    participant KBSvc as KnowledgebaseService
    participant TaskSvc as TaskService
    participant Redis as Redis Stream
    participant TaskExec as TaskExecutor<br/>do_handle_task()
    participant Index as graphrag.index<br/>run_graphrag_for_kb()
    participant Extractor as GraphExtractor<br/>实体关系抽取
    participant LLM as LLM Service
    participant Embed as Embedding Service
    participant ES as Elasticsearch
    participant Lock as Redis分布式锁
    participant Resolution as EntityResolution
    participant Community as CommunityReports

    User->>API: POST /api/v1/kb/run_graphrag<br/>{kb_id: "xxx"}
    
    rect rgb(240, 248, 255)
    Note over API,KBSvc: API层:请求校验与任务创建
    API->>KBSvc: get_by_id(kb_id)
    KBSvc-->>API: 返回 KB 对象
    API->>API: 校验权限<br/>check_kb_team_permission()
    API->>TaskSvc: 检查是否有正在运行的任务<br/>get_by_id(graphrag_task_id)
    TaskSvc-->>API: 任务状态
    API->>TaskSvc: queue_raptor_o_graphrag_tasks()<br/>创建GraphRAG任务
    TaskSvc->>Redis: XADD ragflow_svr<br/>{task_type: "graphrag", doc_ids: [...]}
    Redis-->>TaskSvc: 返回 task_id
    TaskSvc-->>API: task_id
    API-->>User: 202 Accepted<br/>{graphrag_task_id: "xxx"}
    end

    Note over User,Community: === 异步分界线 ===

    rect rgb(255, 250, 240)
    Note over TaskExec,Index: 任务执行层:从队列获取并执行
    TaskExec->>Redis: XREADGROUP ragflow_group<br/>BLOCK 5000 COUNT 1
    Redis-->>TaskExec: 返回 graphrag 任务
    TaskExec->>TaskExec: collect() 获取任务详情
    TaskExec->>Index: run_graphrag_for_kb()<br/>(doc_ids, with_resolution, with_community)
    end

    rect rgb(240, 255, 240)
    Note over Index,ES: 核心层1:子图生成(并行)
    loop 每个文档(最多4个并行)
        Index->>ES: chunk_list(doc_id)<br/>获取文档chunks
        ES-->>Index: 返回chunks列表
        Index->>Index: generate_subgraph(doc_id, chunks)
        Index->>Extractor: GraphExtractor()(doc_id, chunks)
        
        loop 每个chunk(最多10个并行)
            Extractor->>LLM: 调用LLM提取实体和关系<br/>GRAPH_EXTRACTION_PROMPT
            LLM-->>Extractor: 返回实体关系三元组<br/>(entity, relation, entity)
            Extractor->>LLM: 多次gleaning确保完整性<br/>CONTINUE_PROMPT (最多2次)
            LLM-->>Extractor: 补充实体关系
        end
        
        Extractor->>Extractor: _merge_nodes()<br/>合并同名实体
        Extractor->>Extractor: _merge_edges()<br/>合并同关系边
        Extractor-->>Index: 返回 (entities, relations)
        
        Index->>Index: 构建NetworkX子图<br/>subgraph.add_node/add_edge
        Index->>ES: 保存子图到ES<br/>knowledge_graph_kwd="subgraph"
    end
    end

    rect rgb(255, 245, 240)
    Note over Index,Lock: 核心层2:图谱合并(全局锁)
    Index->>Lock: spin_acquire("graphrag_task_{kb_id}")
    Lock-->>Index: 获取分布式锁
    
    loop 每个子图
        Index->>Index: merge_subgraph(subgraph)
        Index->>ES: 获取旧图<br/>get_graph(kb_id)
        ES-->>Index: 返回旧图(如果存在)
        Index->>Index: graph_merge(old_graph, subgraph)
        Index->>Index: PageRank计算<br/>nx.pagerank(new_graph)
        Index->>ES: set_graph(new_graph)<br/>保存节点和边到ES
    end
    
    Index->>Lock: release()
    end

    rect rgb(250, 240, 255)
    Note over Index,Community: 核心层3:实体消歧(可选)
    alt with_resolution == True
        Index->>Lock: spin_acquire("graphrag_task_{kb_id}")
        Lock-->>Index: 获取锁
        Index->>Resolution: EntityResolution()(graph, subgraph_nodes)
        
        loop 批量处理候选实体对(100个/批)
            Resolution->>Resolution: is_similarity(entity_a, entity_b)<br/>编辑距离判断
            Resolution->>LLM: 调用LLM判断是否同一实体<br/>ENTITY_RESOLUTION_PROMPT
            LLM-->>Resolution: 返回 Yes/No
            Resolution->>Resolution: 收集需要合并的实体对
        end
        
        Resolution->>Resolution: _merge_graph_nodes()<br/>合并同义实体
        Resolution->>Resolution: 重新计算PageRank
        Resolution-->>Index: 返回消歧后的图谱
        Index->>ES: set_graph(resolved_graph)
        Index->>Lock: release()
    end
    end

    rect rgb(255, 240, 245)
    Note over Index,Community: 核心层4:社区检测(可选)
    alt with_community == True
        Index->>Lock: spin_acquire("graphrag_task_{kb_id}")
        Lock-->>Index: 获取锁
        Index->>Community: CommunityReportsExtractor()(graph)
        Community->>Community: leiden.run(graph)<br/>Leiden算法社区检测
        
        loop 每个社区(并行)
            Community->>Community: 收集社区内实体和关系
            Community->>LLM: 生成社区摘要<br/>COMMUNITY_REPORT_PROMPT
            LLM-->>Community: 返回摘要报告<br/>{title, summary, findings, rating}
            Community->>ES: 保存社区报告<br/>knowledge_graph_kwd="community_report"
        end
        
        Community-->>Index: 返回社区报告列表
        Index->>Lock: release()
    end
    end

    Index-->>TaskExec: 返回执行结果<br/>{ok_docs, failed_docs, seconds}
    TaskExec->>TaskExec: set_progress(task_id, prog=1.0)
    TaskExec->>Redis: ACK消息

    Note over User: 用户可通过 /api/v1/kb/trace_graphrag 查询进度

时序图说明

  1. API层(步骤1-11):用户发起请求 → 校验权限 → 创建任务 → 返回task_id

    • 关键代码api/apps/kb_app.py::run_graphrag()
    • 边界条件:检查是否有正在运行的任务,避免重复执行
    • 异常处理:无效KB ID、权限不足返回错误
  2. 任务队列层(步骤12-14):异步从Redis队列获取任务

    • 关键代码rag/svr/task_executor.py::collect()handle_task()do_handle_task()
    • 并发控制:消费者组 + ACK机制,支持多worker
    • 超时设置:单个任务最多3小时(@timeout(60*60*3)
  3. 子图生成(步骤15-28):并行处理每个文档

    • 关键代码graphrag/general/index.py::generate_subgraph()
    • 并发度:最多4个文档并行,每个文档内最多10个chunk并行
    • LLM调用:每个chunk调用1次主提取 + 最多2次gleaning
    • Token优化:chunk内容限制1024 tokens
    • 幂等性:通过ES检查does_graph_contains(doc_id)避免重复
  4. 图谱合并(步骤29-38):全局锁保证一致性

    • 关键代码graphrag/general/index.py::merge_subgraph()
    • 分布式锁:Redis锁 graphrag_task_{kb_id},超时1200秒
    • PageRank:合并后重新计算全图的PageRank值
    • 增量更新GraphChange 记录新增/删除/更新的节点和边
  5. 实体消歧(步骤39-50,可选):合并同义实体

    • 关键代码graphrag/entity_resolution.py::EntityResolution()
    • 候选筛选:编辑距离 < len/2(英文)或集合交集 >= 0.8(中文)
    • 批处理:100对/批,最多5个批次并行
    • LLM判断:仅对高相似度候选调用LLM确认
    • 超时策略:单批最多280秒,超时则跳过
  6. 社区检测(步骤51-62,可选):生成层级摘要

    • 关键代码graphrag/general/community_reports_extractor.py::CommunityReportsExtractor()
    • Leiden算法:基于模块度的社区划分
    • 并行摘要:每个社区独立调用LLM生成摘要
    • 存储:社区报告独立存储,用于查询时召回

2.2 GraphRAG 检索时序图

sequenceDiagram
    autonumber
    participant User as 用户查询
    participant Chat as ChatAPI
    participant Search as KGSearch<br/>graphrag.search
    participant LLM as LLM Service
    participant Embed as Embedding Service
    participant ES as Elasticsearch
    
    User->>Chat: 发起对话<br/>"RAGFlow如何工作?"
    Chat->>Search: retrieval(question, kb_ids, max_token=8196)
    
    rect rgb(240, 248, 255)
    Note over Search,LLM: 步骤1:查询改写
    Search->>ES: get_entity_type2samples(kb_ids)<br/>获取实体类型样本
    ES-->>Search: {Person: ["张三", "李四"], Product: ["RAGFlow"]}
    Search->>LLM: query_rewrite(question, type_samples)<br/>MINIRAG_QUERY2KWD_PROMPT
    LLM-->>Search: {answer_type_keywords: ["Product"],<br/>entities_from_query: ["RAGFlow"]}
    end
    
    rect rgb(240, 255, 240)
    Note over Search,ES: 步骤2:实体召回(3路并行)
    par 路径1:关键词匹配实体
        Search->>Embed: encode(entities_from_query)
        Embed-->>Search: 查询向量
        Search->>ES: 向量检索<br/>knowledge_graph_kwd="entity"<br/>top 56
        ES-->>Search: ents_from_query<br/>{entity_kwd, rank_flt, n_hop_with_weight}
    and 路径2:类型过滤实体
        Search->>ES: 精确匹配<br/>entity_type_kwd=answer_type_keywords<br/>ORDER BY rank_flt DESC
        ES-->>Search: ents_from_types<br/>高PageRank实体
    and 路径3:关系召回
        Search->>Embed: encode(question)
        Embed-->>Search: 查询向量
        Search->>ES: 向量检索<br/>knowledge_graph_kwd="relation"<br/>top 56
        ES-->>Search: rels_from_txt<br/>{from_entity, to_entity, description}
    end
    end
    
    rect rgb(255, 250, 240)
    Note over Search,ES: 步骤3:N-hop路径扩展
    Search->>Search: 解析n_hop_with_weight<br/>提取多跳路径
    loop 每个路径
        Search->>Search: 计算路径得分<br/>sim / (2 + hop_distance)
        Search->>Search: 累加边的pagerank权重
    end
    end
    
    rect rgb(250, 240, 255)
    Note over Search,ES: 步骤4:融合排序
    Search->>Search: 实体得分融合<br/>P(E|Q) = sim * pagerank * type_boost
    Search->>Search: 关系得分融合<br/>sim * pagerank * (1 + type_boost)
    Search->>Search: Top-K选择<br/>entities: 6, relations: 6
    end
    
    rect rgb(255, 245, 240)
    Note over Search,ES: 步骤5:社区召回
    Search->>ES: 查询相关社区<br/>entities_kwd IN top_entities<br/>ORDER BY weight_flt DESC<br/>top 1
    ES-->>Search: community_reports
    end
    
    rect rgb(245, 255, 245)
    Note over Search,Chat: 步骤6:构建上下文
    Search->>Search: 格式化实体表格<br/>Entity, Score, Description
    Search->>Search: 格式化关系表格<br/>From Entity, To Entity, Description
    Search->>Search: 格式化社区报告<br/>Title, Summary, Findings
    Search-->>Chat: 返回增强上下文<br/>{content_with_weight: "..."}
    end
    
    Chat->>LLM: 调用LLM生成回答<br/>context + question
    LLM-->>Chat: 生成答案
    Chat-->>User: 返回回答

检索时序图说明

  1. 查询改写(步骤1-4)

    • 从图谱中采样实体类型(每类3个示例)
    • 使用LLM识别查询中的实体和期望答案类型
    • Prompt: MINIRAG_QUERY2KWD_PROMPT
    • 输出: answer_type_keywords(如Person/Product),entities_from_query(如"RAGFlow")
  2. 实体召回(步骤5-14,3路并行)

    • 路径1:向量检索 + 关键词匹配,召回相似实体(top 56)
    • 路径2:类型过滤 + PageRank排序,召回高权重实体
    • 路径3:向量检索关系描述,召回相关关系(top 56)
    • 字段: entity_kwd, rank_flt(PageRank),n_hop_with_weight(邻居路径)
  3. N-hop路径扩展(步骤15-19)

    • 解析 n_hop_with_weight: [{path: [A, B, C], weights: [0.8, 0.6]}]
    • 计算路径得分:sim / (2 + hop_distance) —— 距离越远衰减越大
    • 将N-hop关系补充到 rels_from_txt
  4. 融合排序(步骤20-23)

    • 实体得分: sim * pagerank * type_boost
      • 如果实体类型匹配 answer_type_keywords,则 type_boost = 2
    • 关系得分: sim * pagerank * (1 + type_boost)
      • 如果关系两端实体类型匹配,则增加boost
    • Top-K: entities取前6,relations取前6
  5. 社区召回(步骤24-25)

    • 查询条件:entities_kwd IN top_entities
    • 排序:weight_flt DESC(社区权重 = 社区内实体度数和)
    • 限制:top 1(最相关的社区报告)
  6. 构建上下文(步骤26-29)

    • 格式化为CSV表格(Pandas DataFrame)
    • Token限制:max_token = 8196,超出则截断
    • 返回结构:
      {
          "content_with_weight": """
          ---- Entities ----
          Entity,Score,Description
          RAGFlow,0.95,"开源RAG引擎"
      
          ---- Relations ----
          From Entity,To Entity,Score,Description
          RAGFlow,LLM,0.87,"RAGFlow使用LLM生成答案"
      
          ---- Community Report ----
          # RAG技术栈
          ## Content
          该社区包含RAG相关技术...
          """,
          "docnm_kwd": "Related content in Knowledge Graph",
          "similarity": 1.0
      }
      

性能优化点

  • 并行召回:3路召回并行执行,降低延迟
  • 向量缓存:LLM Cache机制缓存重复查询
  • 得分融合:结合语义相似度(sim)和图结构重要性(pagerank)
  • Token预算:动态截断,确保不超过上下文窗口

3. 核心模块调用链路详解

3.1 API层入口(api/apps/kb_app.py)

3.1.1 run_graphrag() 函数

# 文件:api/apps/kb_app.py
@manager.route("/run_graphrag", methods=["POST"])
@login_required
def run_graphrag():
    """
    GraphRAG构建入口API
    请求:POST /api/v1/kb/run_graphrag
    参数:{"kb_id": "xxx"}
    返回:{"graphrag_task_id": "xxx"}
    """
    req = request.json
    kb_id = req.get("kb_id", "")
    
    # 1. 校验知识库是否存在
    ok, kb = KnowledgebaseService.get_by_id(kb_id)
    if not ok:
        return get_error_data_result(message="Invalid Knowledgebase ID")
    
    # 2. 检查是否有正在运行的任务(防止重复执行)
    task_id = kb.graphrag_task_id
    if task_id:
        ok, task = TaskService.get_by_id(task_id)
        if task and task.progress not in [-1, 1]:  # -1=失败, 1=成功
            return get_error_data_result(
                message=f"Task {task_id} in progress. A Graph Task is already running."
            )
    
    # 3. 获取知识库下所有文档
    documents, _ = DocumentService.get_by_kb_id(
        kb_id=kb_id,
        page_number=0,
        items_per_page=0,  # 0表示获取全部
        orderby="create_time",
        desc=False,
    )
    if not documents:
        return get_error_data_result(message=f"No documents in Knowledgebase {kb_id}")
    
    # 4. 创建异步任务
    sample_document = documents[0]
    document_ids = [document["id"] for document in documents]
    task_id = queue_raptor_o_graphrag_tasks(
        doc=sample_document,
        ty="graphrag",
        priority=0,
        fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID,  # 占位ID,避免与普通文档任务冲突
        doc_ids=list(document_ids)
    )
    
    # 5. 更新知识库的graphrag_task_id字段
    KnowledgebaseService.update_by_id(kb.id, {"graphrag_task_id": task_id})
    
    return get_json_result(data={"graphrag_task_id": task_id})

关键点

  • 幂等性检查:通过 kb.graphrag_task_id 和任务进度判断是否有正在运行的任务
  • 异步解耦:立即返回 task_id,任务在后台执行
  • 全量文档:一次性处理知识库下所有文档

3.2 任务队列层(api/db/services/document_service.py)

3.2.1 queue_raptor_o_graphrag_tasks() 函数

# 文件:api/db/services/document_service.py
def queue_raptor_o_graphrag_tasks(doc, ty, priority, fake_doc_id="", doc_ids=[]):
    """
    创建GraphRAG/Raptor任务并推入Redis队列
    
    参数:
        doc: 文档对象(用于获取配置)
        ty: 任务类型("graphrag" | "raptor")
        priority: 优先级(0-9)
        fake_doc_id: 占位文档ID(用于知识库级别的任务)
        doc_ids: 需要处理的文档ID列表
    
    返回:
        task_id: 任务ID
    """
    # 1. 获取分块配置(用于计算任务摘要)
    chunking_config = DocumentService.get_chunking_config(doc["id"])
    hasher = xxhash.xxh64()
    for field in sorted(chunking_config.keys()):
        hasher.update(str(chunking_config[field]).encode("utf-8"))
    
    # 2. 创建任务对象
    task = {
        "id": get_uuid(),
        "doc_id": fake_doc_id if fake_doc_id else doc["id"],
        "from_page": 100000000,  # 占位值
        "to_page": 100000000,
        "task_type": ty,
        "progress_msg": datetime.now().strftime("%H:%M:%S") + " created task " + ty,
        "begin_at": datetime.now(),
    }
    
    # 3. 计算任务摘要(用于去重)
    for field in ["doc_id", "from_page", "to_page"]:
        hasher.update(str(task.get(field, "")).encode("utf-8"))
    hasher.update(ty.encode("utf-8"))
    task["digest"] = hasher.hexdigest()
    
    # 4. 持久化到MySQL
    bulk_insert_into_db(Task, [task], True)  # True表示INSERT IGNORE
    
    # 5. 推入Redis队列
    if ty in ["graphrag", "raptor", "mindmap"]:
        task["doc_ids"] = doc_ids  # 携带文档ID列表
        DocumentService.begin2parse(doc["id"])  # 更新文档状态为"解析中"
    
    assert REDIS_CONN.queue_product(
        get_svr_queue_name(priority),  # 队列名:ragflow_svr_0 ~ ragflow_svr_9
        message=task
    ), "Can't access Redis. Please check the Redis' status."
    
    return task["id"]

关键点

  • 任务摘要(digest):基于配置和参数计算哈希,用于去重
  • INSERT IGNORE:避免重复创建相同任务
  • Redis Stream:使用XADD命令推入消息队列

3.3 任务执行层(rag/svr/task_executor.py)

3.3.1 collect() 和 handle_task() 函数

# 文件:rag/svr/task_executor.py
async def collect():
    """从Redis队列获取任务"""
    global UNACKED_ITERATOR
    
    svr_queue_names = get_svr_queue_names()  # [ragflow_svr_0, ..., ragflow_svr_9]
    
    # 1. 先处理未ACK的消息(重试机制)
    if not UNACKED_ITERATOR:
        UNACKED_ITERATOR = REDIS_CONN.get_unacked_iterator(
            svr_queue_names,
            SVR_CONSUMER_GROUP_NAME,  # ragflow_group
            CONSUMER_NAME  # 当前worker的唯一名称
        )
    try:
        redis_msg = next(UNACKED_ITERATOR)
    except StopIteration:
        # 2. 没有未ACK的消息,获取新消息
        for svr_queue_name in svr_queue_names:
            redis_msg = REDIS_CONN.queue_consumer(
                svr_queue_name,
                SVR_CONSUMER_GROUP_NAME,
                CONSUMER_NAME
            )
            if redis_msg:
                break
    
    if not redis_msg:
        return None, None
    
    msg = redis_msg.get_message()
    
    # 3. 特殊任务处理(graphrag/raptor/mindmap)
    if msg.get("doc_id", "") in [GRAPH_RAPTOR_FAKE_DOC_ID, CANVAS_DEBUG_DOC_ID]:
        task = msg
        if task["task_type"] in ["graphrag", "raptor", "mindmap"] and msg.get("doc_ids", []):
            task = TaskService.get_task(msg["id"], msg["doc_ids"])
            task["doc_ids"] = msg["doc_ids"]
    else:
        task = TaskService.get_task(msg["id"])
    
    # 4. 检查任务是否已取消
    if not task or has_canceled(task["id"]):
        redis_msg.ack()  # 确认消息
        return None, None
    
    return redis_msg, task


async def handle_task():
    """处理单个任务"""
    global DONE_TASKS, FAILED_TASKS
    
    redis_msg, task = await collect()
    if not task:
        await trio.sleep(5)
        return
    
    task_type = task["task_type"]
    
    try:
        logging.info(f"handle_task begin for task {json.dumps(task)}")
        CURRENT_TASKS[task["id"]] = copy.deepcopy(task)
        
        # 调用具体的任务处理函数
        await do_handle_task(task)
        
        DONE_TASKS += 1
        CURRENT_TASKS.pop(task["id"], None)
        logging.info(f"handle_task done for task {json.dumps(task)}")
    except Exception as e:
        FAILED_TASKS += 1
        CURRENT_TASKS.pop(task["id"], None)
        # 记录错误信息
        set_progress(task["id"], prog=-1, msg=f"[Exception]: {str(e)}")
        logging.exception(f"handle_task got exception for task {json.dumps(task)}")
    finally:
        # 记录任务日志
        if task_type in ["graphrag", "raptor", "mindmap"]:
            task_document_ids = task["doc_ids"]
        PipelineOperationLogService.record_pipeline_operation(
            document_id=task["doc_id"],
            pipeline_id="",
            task_type=TASK_TYPE_TO_PIPELINE_TASK_TYPE.get(task_type),
            fake_document_ids=task_document_ids
        )
        
        redis_msg.ack()  # 确认消息

关键点

  • 消费者组(Consumer Group):支持多worker并行,自动负载均衡
  • 未ACK重试:处理失败的任务会重新投递
  • 全局任务追踪CURRENT_TASKS 记录正在执行的任务
  • 异常容错:捕获异常后记录到任务进度,不影响其他任务

3.3.2 do_handle_task() - GraphRAG分支

# 文件:rag/svr/task_executor.py
@timeout(60*60*3, 1)  # 超时3小时
async def do_handle_task(task):
    task_type = task.get("task_type", "")
    
    # ... 省略其他任务类型 ...
    
    if task_type == "graphrag":
        start_ts = timer()
        
        # 1. 获取知识库配置
        kb_id = task["kb_id"]
        _, kb = KnowledgebaseService.get_by_id(kb_id)
        kb_parser_config = kb.parser_config or {}
        graphrag_conf = kb_parser_config.get("graphrag", {})
        
        # 2. 确定语言
        task_language = graphrag_conf.get("language", "English")
        
        # 3. 获取LLM和Embedding模型
        chat_model = LLMBundle(task["tenant_id"], LLMType.CHAT, kb.llm_id)
        embedding_model = LLMBundle(task["tenant_id"], LLMType.EMBEDDING, kb.embd_id)
        
        # 4. 进度回调函数
        def progress_callback(prog=None, msg=""):
            set_progress(task["id"], prog=prog, msg=msg)
        
        # 5. 读取配置项
        with_resolution = graphrag_conf.get("resolution", True)  # 是否实体消歧
        with_community = graphrag_conf.get("community", False)  # 是否社区检测
        
        # 6. 调用GraphRAG核心函数
        async with kg_limiter:  # 限流:最多并发kg_limiter个GraphRAG任务
            result = await run_graphrag_for_kb(
                row=task,
                doc_ids=task.get("doc_ids", []),
                language=task_language,
                kb_parser_config=kb_parser_config,
                chat_model=chat_model,
                embedding_model=embedding_model,
                callback=progress_callback,
                with_resolution=with_resolution,
                with_community=with_community,
            )
            logging.info(f"GraphRAG task result:\n{result}")
        
        progress_callback(prog=1.0, msg=f"Knowledge Graph done ({timer() - start_ts:.2f}s)")
        return

关键点

  • 超时保护@timeout(60*60*3) - 3小时超时
  • 并发限流kg_limiter 控制同时执行的GraphRAG任务数
  • 进度回调:实时更新任务进度,用户可通过API查询
  • 配置驱动with_resolutionwith_community 可配置

3.4 GraphRAG核心层(graphrag/general/index.py)

3.4.1 run_graphrag_for_kb() 主函数

# 文件:graphrag/general/index.py
async def run_graphrag_for_kb(
    row: dict,
    doc_ids: list[str],
    language: str,
    kb_parser_config: dict,
    chat_model,
    embedding_model,
    callback,
    *,
    with_resolution: bool = True,
    with_community: bool = True,
    max_parallel_docs: int = 4,
) -> dict:
    """
    知识库级别的GraphRAG构建
    
    参数:
        row: 任务对象,包含tenant_id, kb_id等
        doc_ids: 需要处理的文档ID列表
        language: 语言(English/Chinese)
        kb_parser_config: 解析配置
        chat_model/embedding_model: LLM和Embedding服务
        callback: 进度回调
        with_resolution: 是否实体消歧
        with_community: 是否社区检测
        max_parallel_docs: 最大并行文档数
    
    返回:
        {"ok_docs": [...], "failed_docs": [...], "total_docs": N, "total_chunks": M, "seconds": T}
    """
    tenant_id, kb_id = row["tenant_id"], row["kb_id"]
    start = trio.current_time()
    
    # 1. 如果未指定doc_ids,则获取知识库下所有文档
    if not doc_ids:
        docs, _ = DocumentService.get_by_kb_id(kb_id=kb_id, ...)
        doc_ids = [doc["id"] for doc in docs]
    
    doc_ids = list(dict.fromkeys(doc_ids))  # 去重
    if not doc_ids:
        callback(msg=f"[GraphRAG] kb:{kb_id} has no processable doc_id.")
        return {"ok_docs": [], "failed_docs": [], "total_docs": 0, ...}
    
    # 2. 加载所有文档的chunks(合并小chunk以节省LLM调用)
    def load_doc_chunks(doc_id: str) -> list[str]:
        chunks = []
        current_chunk = ""
        
        for d in settings.retriever.chunk_list(doc_id, tenant_id, [kb_id], ...):
            content = d["content_with_weight"]
            if num_tokens_from_string(current_chunk + content) < 1024:
                current_chunk += content  # 合并
            else:
                if current_chunk:
                    chunks.append(current_chunk)
                current_chunk = content
        
        if current_chunk:
            chunks.append(current_chunk)
        
        return chunks
    
    all_doc_chunks: dict[str, list[str]] = {}
    total_chunks = 0
    for doc_id in doc_ids:
        chunks = load_doc_chunks(doc_id)
        all_doc_chunks[doc_id] = chunks
        total_chunks += len(chunks)
    
    # 3. 并行生成子图(每个文档一个子图)
    semaphore = trio.Semaphore(max_parallel_docs)  # 最多4个文档并行
    subgraphs: dict[str, nx.Graph] = {}
    failed_docs: list[tuple[str, str]] = []
    
    async def build_one(doc_id: str):
        """构建单个文档的子图"""
        chunks = all_doc_chunks.get(doc_id, [])
        if not chunks:
            return
        
        # 选择Light或General模式
        kg_extractor = (
            LightKGExt 
            if kb_parser_config.get("graphrag", {}).get("method") != "general" 
            else GeneralKGExt
        )
        
        async with semaphore:
            try:
                deadline = max(120, len(chunks) * 60 * 10)  # 超时时间:120s ~ chunks*10min
                callback(msg=f"[GraphRAG] build_subgraph doc:{doc_id} start (chunks={len(chunks)})")
                
                with trio.fail_after(deadline):
                    sg = await generate_subgraph(
                        kg_extractor,
                        tenant_id,
                        kb_id,
                        doc_id,
                        chunks,
                        language,
                        kb_parser_config.get("graphrag", {}).get("entity_types", []),
                        chat_model,
                        embedding_model,
                        callback,
                    )
                
                if sg:
                    subgraphs[doc_id] = sg
                    callback(msg=f"[GraphRAG] build_subgraph doc:{doc_id} done")
                else:
                    failed_docs.append((doc_id, "subgraph is empty"))
            except Exception as e:
                failed_docs.append((doc_id, repr(e)))
                callback(msg=f"[GraphRAG] build_subgraph doc:{doc_id} FAILED: {e!r}")
    
    # trio.open_nursery(): 并行执行多个异步任务
    async with trio.open_nursery() as nursery:
        for doc_id in doc_ids:
            nursery.start_soon(build_one, doc_id)
    
    ok_docs = [d for d in doc_ids if d in subgraphs]
    if not ok_docs:
        callback(msg=f"[GraphRAG] kb:{kb_id} no subgraphs generated, end.")
        return {"ok_docs": [], "failed_docs": failed_docs, ...}
    
    # 4. 合并子图到全局图谱(串行,加锁)
    kb_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value="batch_merge", timeout=1200)
    await kb_lock.spin_acquire()
    callback(msg=f"[GraphRAG] kb:{kb_id} merge lock acquired")
    
    try:
        final_graph = None
        for doc_id in ok_docs:
            sg = subgraphs[doc_id]
            new_graph = await merge_subgraph(
                tenant_id, kb_id, doc_id, sg, embedding_model, callback
            )
            if new_graph is not None:
                final_graph = new_graph  # 保留最新的图谱引用
        
        callback(msg=f"[GraphRAG] kb:{kb_id} merge finished, graph ready.")
    finally:
        kb_lock.release()
    
    # 5. 实体消歧(可选)
    if with_resolution and final_graph:
        await kb_lock.spin_acquire()
        
        subgraph_nodes = set()
        for sg in subgraphs.values():
            subgraph_nodes.update(set(sg.nodes()))
        
        await resolve_entities(
            final_graph, subgraph_nodes, tenant_id, kb_id, None,
            chat_model, embedding_model, callback
        )
        
        kb_lock.release()
    
    # 6. 社区检测(可选)
    if with_community and final_graph:
        await kb_lock.spin_acquire()
        
        await extract_community(
            final_graph, tenant_id, kb_id, None,
            chat_model, embedding_model, callback
        )
        
        kb_lock.release()
    
    now = trio.current_time()
    callback(msg=f"[GraphRAG] GraphRAG for KB {kb_id} done in {now - start:.2f}s.")
    return {
        "ok_docs": ok_docs,
        "failed_docs": failed_docs,
        "total_docs": len(doc_ids),
        "total_chunks": total_chunks,
        "seconds": now - start,
    }

关键点

  • Chunk合并:将小chunk合并到1024 tokens以内,减少LLM调用次数
  • 并行子图生成trio.Semaphore(4) 控制最多4个文档并行
  • 超时策略:每个文档的超时时间 = max(120, chunks数 * 10分钟)
  • 分布式锁:合并、消歧、社区检测阶段加全局锁,保证一致性
  • 容错机制:单个文档失败不影响其他文档

3.5 实体关系抽取层(graphrag/general/graph_extractor.py)

3.5.1 GraphExtractor 核心逻辑

# 文件:graphrag/general/graph_extractor.py
class GraphExtractor(Extractor):
    """General模式的图谱抽取器(使用LLM)"""
    
    async def __call__(self, doc_id: str, chunks: list[str], callback: Callable | None = None):
        """
        从chunks中抽取实体和关系
        
        参数:
            doc_id: 文档ID
            chunks: 文档内容分块(已合并到1024 tokens以内)
            callback: 进度回调
        
        返回:
            (all_entities_data, all_relationships_data)
        """
        self.callback = callback
        start_ts = trio.current_time()
        
        # 1. 并行处理所有chunks(最多10个并行)
        async def extract_all(doc_id, chunks, max_concurrency=10):
            out_results = []
            limiter = trio.Semaphore(max_concurrency)
            
            async def worker(chunk_key_dp: tuple[str, str], idx: int, total: int):
                async with limiter:
                    await self._process_single_content(chunk_key_dp, idx, total, out_results)
            
            async with trio.open_nursery() as nursery:
                for i, ck in enumerate(chunks):
                    nursery.start_soon(worker, (doc_id, ck), i, len(chunks))
            
            return out_results
        
        out_results = await extract_all(doc_id, chunks)
        
        # 2. 合并所有chunk的结果
        maybe_nodes = defaultdict(list)  # {entity_name: [entity_data, ...]}
        maybe_edges = defaultdict(list)  # {(src, dst): [relation_data, ...]}
        sum_token_count = 0
        
        for m_nodes, m_edges, token_count in out_results:
            for k, v in m_nodes.items():
                maybe_nodes[k].extend(v)
            for k, v in m_edges.items():
                maybe_edges[tuple(sorted(k))].extend(v)
            sum_token_count += token_count
        
        callback(msg=f"Entities extraction done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {sum_token_count} tokens")
        
        # 3. 合并同名实体(并行)
        all_entities_data = []
        async with trio.open_nursery() as nursery:
            for en_nm, ents in maybe_nodes.items():
                nursery.start_soon(self._merge_nodes, en_nm, ents, all_entities_data)
        
        # 4. 合并同关系边(并行)
        all_relationships_data = []
        async with trio.open_nursery() as nursery:
            for (src, tgt), rels in maybe_edges.items():
                nursery.start_soon(self._merge_edges, src, tgt, rels, all_relationships_data)
        
        return all_entities_data, all_relationships_data


    async def _process_single_content(self, chunk_key_dp: tuple[str, str], chunk_seq: int, num_chunks: int, out_results):
        """处理单个chunk,提取实体和关系"""
        chunk_key = chunk_key_dp[0]  # doc_id
        content = chunk_key_dp[1]     # chunk文本
        token_count = 0
        
        # 1. 构建主提取Prompt
        variables = {
            **self._prompt_variables,  # tuple_delimiter, record_delimiter等
            "input_text": content,
        }
        hint_prompt = perform_variable_replacements(GRAPH_EXTRACTION_PROMPT, variables=variables)
        
        # 2. 第一次LLM调用(主提取)
        async with chat_limiter:  # 限流:避免LLM并发过高
            response = await trio.to_thread.run_sync(
                lambda: self._chat(hint_prompt, [{"role": "user", "content": "Output:"}], {})
            )
        token_count += num_tokens_from_string(hint_prompt + response)
        results = response or ""
        history = [
            {"role": "system", "content": hint_prompt},
            {"role": "user", "content": response}
        ]
        
        # 3. Gleaning:多次迭代确保完整性(最多2次)
        for i in range(self._max_gleanings):  # _max_gleanings = 2
            history.append({"role": "user", "content": CONTINUE_PROMPT})
            
            async with chat_limiter:
                response = await trio.to_thread.run_sync(lambda: self._chat("", history, {}))
            
            token_count += num_tokens_from_string(response)
            results += response or ""
            
            # 最后一次迭代不需要判断是否继续
            if i >= self._max_gleanings - 1:
                break
            
            # 4. 询问LLM是否还有遗漏的实体
            history.append({"role": "assistant", "content": response})
            history.append({"role": "user", "content": LOOP_PROMPT})  # "还有吗?回答YES或NO"
            
            async with chat_limiter:
                continuation = await trio.to_thread.run_sync(lambda: self._chat("", history))
            
            token_count += num_tokens_from_string(continuation)
            if continuation != "Y":  # LLM回答NO,停止gleaning
                break
            
            history.append({"role": "assistant", "content": "Y"})
        
        # 5. 解析LLM返回的实体和关系
        records = split_string_by_multi_markers(results, ["##", "<|COMPLETE|>"])
        rcds = []
        for record in records:
            match = re.search(r"\((.*)\)", record)  # 提取括号内内容
            if match:
                rcds.append(match.group(1))
        
        maybe_nodes, maybe_edges = self._entities_and_relations(chunk_key, rcds, "<|>")
        out_results.append((maybe_nodes, maybe_edges, token_count))
        
        if self.callback:
            self.callback(
                0.5 + 0.1 * len(out_results) / num_chunks,
                msg=f"Entities extraction of chunk {chunk_seq} {len(out_results)}/{num_chunks} done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges"
            )


    def _entities_and_relations(self, chunk_key: str, records: list, tuple_delimiter: str):
        """解析LLM返回的记录,分离实体和关系"""
        maybe_nodes = defaultdict(list)
        maybe_edges = defaultdict(list)
        ent_types = [t.lower() for t in self._entity_types]  # 允许的实体类型
        
        for record in records:
            # record示例: "实体<|>RAGFlow<|>Product<|>开源RAG引擎<|>10"
            # 或: "关系<|>RAGFlow<|>LLM<|>uses<|>使用LLM生成答案<|>8"
            record_attributes = split_string_by_multi_markers(record, [tuple_delimiter])
            
            # 尝试解析为实体
            if_entities = handle_single_entity_extraction(record_attributes, chunk_key)
            if if_entities and if_entities.get("entity_type", "").lower() in ent_types:
                maybe_nodes[if_entities["entity_name"]].append(if_entities)
                continue
            
            # 尝试解析为关系
            if_relation = handle_single_relationship_extraction(record_attributes, chunk_key)
            if if_relation:
                maybe_edges[(if_relation["src_id"], if_relation["tgt_id"])].append(if_relation)
        
        return dict(maybe_nodes), dict(maybe_edges)


    async def _merge_nodes(self, entity_name: str, entities: list[dict], all_entities_data):
        """合并同名实体"""
        if not entities:
            return
        
        # 1. 统计实体类型(选择出现最多的)
        entity_type = sorted(
            Counter([dp["entity_type"] for dp in entities]).items(),
            key=lambda x: x[1],
            reverse=True
        )[0][0]
        
        # 2. 合并描述(用<SEP>分隔)
        description = "<SEP>".join(sorted(set([dp["description"] for dp in entities])))
        
        # 3. 收集来源文档
        already_source_ids = flat_uniq_list(entities, "source_id")
        
        # 4. 如果描述过多(>12个),调用LLM总结
        description = await self._handle_entity_relation_summary(entity_name, description)
        
        node_data = {
            "entity_name": entity_name,
            "entity_type": entity_type,
            "description": description,
            "source_id": already_source_ids,
        }
        all_entities_data.append(node_data)


    async def _merge_edges(self, src_id: str, tgt_id: str, edges_data: list[dict], all_relationships_data):
        """合并同关系边"""
        if not edges_data:
            return
        
        # 1. 累加权重
        weight = sum([edge["weight"] for edge in edges_data])
        
        # 2. 合并描述
        description = "<SEP>".join(sorted(set([edge["description"] for edge in edges_data])))
        description = await self._handle_entity_relation_summary(f"{src_id} -> {tgt_id}", description)
        
        # 3. 合并关键词和来源
        keywords = flat_uniq_list(edges_data, "keywords")
        source_id = flat_uniq_list(edges_data, "source_id")
        
        edge_data = {
            "src_id": src_id,
            "tgt_id": tgt_id,
            "description": description,
            "keywords": keywords,
            "weight": weight,
            "source_id": source_id
        }
        all_relationships_data.append(edge_data)


    async def _handle_entity_relation_summary(self, entity_or_relation_name: str, description: str) -> str:
        """如果描述过多(>12个),调用LLM总结"""
        description_list = description.split("<SEP>")
        if len(description_list) <= 12:
            return description
        
        # 调用LLM生成摘要
        prompt = SUMMARIZE_DESCRIPTIONS_PROMPT.format(
            entity_name=entity_or_relation_name,
            description_list=description_list,
            language=self._language
        )
        
        async with chat_limiter:
            summary = await trio.to_thread.run_sync(
                self._chat, "", [{"role": "user", "content": prompt}]
            )
        
        return summary

关键点

  • 并发处理:10个chunk并行,每个chunk独立调用LLM
  • Gleaning机制:每个chunk最多3次LLM调用(1次主提取 + 2次补充)
  • LLM Cache_chat() 内置缓存,相同输入直接返回缓存结果
  • 描述合并:同名实体的描述用<SEP>分隔,超过12个则调用LLM总结
  • 权重累加:同关系边的权重相加(用于PageRank计算)

Prompt示例(GRAPH_EXTRACTION_PROMPT)

-Goal-
Given a text document that is potentially relevant to this activity and a list of entity types, identify all entities of those types from the text and all relationships among the identified entities.

-Steps-
1. Identify all entities. For each identified entity, extract the following information:
- entity_name: Name of the entity, capitalized
- entity_type: One of the following types: [Person,Organization,Location,Event,Category]
- entity_description: Comprehensive description of the entity's attributes and activities
Format each entity as ("entity"<|>entity_name<|>entity_type<|>entity_description)

2. From the entities identified in step 1, identify all pairs of (source_entity, target_entity) that are *clearly related* to each other.
For each pair of related entities, extract the following information:
- source_entity: name of the source entity, as identified in step 1
- target_entity: name of the target entity, as identified in step 1
- relationship_description: explanation as to why you think the source entity and the target entity are related to each other
- relationship_strength: an integer score between 1 to 10, indicating strength of the relationship between the source entity and target entity
Format each relationship as ("relationship"<|>source_entity<|>target_entity<|>relationship_description<|>relationship_strength)

3. Return output in {language} as a single list of all the entities and relationships identified in steps 1 and 2. Use **##** as the list delimiter.

4. If you have to translate, just translate the descriptions, nothing else!

5. When finished, output <|COMPLETE|>

######################
-Examples-
######################
{examples}

######################
-Real Data-
######################
Entity_types: {entity_types}
Text: {input_text}
######################
Output:

LLM返回示例

("entity"<|>RAGFlow<|>Product<|>An open-source RAG engine for building AI applications<|>10)##
("entity"<|>LLM<|>Technology<|>Large Language Model used for natural language processing<|>9)##
("entity"<|>Elasticsearch<|>Technology<|>Search and analytics engine<|>8)##
("relationship"<|>RAGFlow<|>LLM<|>RAGFlow uses LLM to generate responses<|>9)##
("relationship"<|>RAGFlow<|>Elasticsearch<|>RAGFlow stores data in Elasticsearch<|>8)##
<|COMPLETE|>

4. 模块交互矩阵

调用方 被调方 调用方式 数据流 错误语义 一致性要求
kb_app.run_graphrag() KnowledgebaseService 同步函数调用 KB配置 返回错误码 读已提交
kb_app.run_graphrag() queue_raptor_o_graphrag_tasks() 同步函数调用 任务参数 抛出异常 强一致性(MySQL)
TaskExecutor Redis Stream 异步队列消费 任务消息 ACK/NACK 至少一次投递
run_graphrag_for_kb() generate_subgraph() 异步并行调用 chunks → subgraph 单文档失败不影响其他 最终一致性
GraphExtractor LLM Service 异步并发调用 chunk → entities/relations 重试3次后放弃 无状态
merge_subgraph() Elasticsearch 异步批量写入 节点/边数据 抛出异常回滚 最终一致性
EntityResolution Redis分布式锁 spin_acquire() 锁状态 超时返回失败 线性一致性
KGSearch.retrieval() Elasticsearch 异步并发查询 查询 → 实体/关系/社区 降级返回空 弱一致性

5. 关键设计与权衡

5.1 一致性模型

分布式锁(Redis)

  • 使用场景:图谱合并、实体消歧、社区检测
  • 实现RedisDistributedLock with spin_acquire()
  • 超时:1200秒(20分钟)
  • 权衡:牺牲可用性换取一致性,避免并发修改导致的图谱冲突

最终一致性(Elasticsearch)

  • 使用场景:子图存储、实体/关系/社区索引
  • 实现:批量写入(bulk_size=4)
  • 权衡:允许短暂不一致,提升写入吞吐量

幂等性设计

  • 任务去重:通过digest(xxhash)避免重复任务
  • 子图覆盖:通过does_graph_contains(doc_id)检查,存在则跳过
  • ACK机制:Redis Stream确保消息至少被处理一次

5.2 并发控制

层级并发

  1. 知识库级别:最多1个GraphRAG任务(通过KB锁)
  2. 文档级别:最多4个文档并行(Semaphore(4)
  3. Chunk级别:最多10个chunk并行(Semaphore(10)
  4. LLM调用:全局限流chat_limiter(避免LLM服务过载)

超时策略

  • 任务级别:3小时(@timeout(60*60*3)
  • 文档级别max(120, chunks数 * 10分钟)
  • LLM调用:20分钟(@timeout(60*20)
  • 社区摘要:120秒/社区(@timeout(120)

5.3 性能关键路径

瓶颈分析

  1. LLM调用:占总时间 > 90%
    • 优化:Chunk合并(减少调用次数)、并发调用、LLM Cache
  2. 图谱合并:全局锁串行执行
    • 优化:增量合并(仅更新变更的节点/边)
  3. Elasticsearch写入:批量写入
    • 优化:bulk_size=4,避免单次写入过大

Token优化

  • Chunk合并:小chunk合并到1024 tokens以内
  • 描述总结:超过12个描述调用LLM总结(避免context过长)
  • 上下文截断:检索时max_token=8196,超出则截断

5.4 配置项

配置项 默认值 说明
method "light" 模式选择:"light""general"
resolution True 是否实体消歧
community False 是否社区检测
language "English" 语言(影响Prompt)
entity_types `[“Person”,“Organization”,…] 实体类型白名单
max_gleanings 2 Gleaning最大次数
max_parallel_docs 4 最大并行文档数
max_concurrent_chunks 10 最大并行chunk数

6. 最佳实践与性能建议

6.1 选择合适的模式

场景 推荐模式 原因
文档< 100篇,预算有限 Light 不调用LLM,成本低
文档> 1000篇,高精度要求 General + resolution + community 完整图谱,支持复杂查询
实时性要求高 Light 构建速度快(< 1分钟)
多语言混合文档 General LLM对多语言支持更好

6.2 实体类型设计

通用领域Person, Organization, Location, Event, Category 金融领域Company, Stock, Indicator, Policy, Market 医疗领域Disease, Drug, Symptom, Treatment, Organ

建议

  • 类型数量控制在 5-10 个
  • 避免过细粒度(如将Person拆分为CEO, Engineer等)

6.3 Prompt工程

Few-shot示例:在Prompt中提供2-3个示例,提升LLM准确率20-30% 结构化输出:使用固定分隔符(##, <|>),方便解析 语言一致性:Prompt和文档使用相同语言

6.4 成本优化

LLM成本(以GPT-4为例):

  • Light模式:$0(不调用LLM)
  • General模式:每1000 chunks约$5-10
  • 实体消歧:+20%(仅对候选对调用)
  • 社区摘要:+10%(每个社区1次调用)

优化建议

  1. 使用更便宜的模型(如GPT-3.5-turbo)
  2. 增大chunk大小(减少调用次数)
  3. 缓存重复查询(LLM Cache命中率可达30-50%)

7. 监控与可观测性

7.1 关键指标

任务级别

  • graphrag_task_duration_seconds:任务总耗时
  • graphrag_task_success_rate:任务成功率
  • graphrag_documents_processed:处理的文档数
  • graphrag_chunks_processed:处理的chunk数

LLM级别

  • llm_call_count:LLM调用次数
  • llm_token_usage:Token消耗
  • llm_cache_hit_rate:缓存命中率

图谱级别

  • graph_entities_count:实体数量
  • graph_relations_count:关系数量
  • graph_communities_count:社区数量
  • graph_merge_duration_seconds:合并耗时

7.2 日志追踪

进度日志(通过callback实时输出):

[GraphRAG] build_subgraph doc:xxx start (chunks=50, timeout=3000s)
[GraphRAG] Entities extraction of chunk 10/50 done, 25 nodes, 18 edges
[GraphRAG] build_subgraph doc:xxx done
[GraphRAG] kb:yyy merge lock acquired
[GraphRAG] kb:yyy merge finished, graph ready
[GraphRAG] Resolved 150 candidate pairs, 45 of them are selected to merge
[GraphRAG] Community reports done in 120.5s, used tokens: 50000
[GraphRAG] GraphRAG for KB yyy done in 1200.50 seconds

8. 故障排查与常见问题

8.1 任务失败

问题:任务进度显示 -1(失败) 排查

  1. 查看任务日志:/api/v1/kb/trace_graphrag?kb_id=xxx
  2. 检查LLM服务是否正常
  3. 检查Elasticsearch连接
  4. 检查Redis分布式锁是否超时

常见原因

  • LLM服务超时或返回错误
  • Elasticsearch磁盘满
  • Redis锁超时(20分钟)

8.2 图谱质量问题

问题:实体识别不准确 优化

  1. 调整entity_types:针对领域定制
  2. 使用Few-shot示例:在Prompt中添加示例
  3. 切换更强的LLM模型(如GPT-4)

问题:实体过多重复 优化

  1. 开启resolution=True
  2. 调整相似度阈值(is_similarity()

问题:关系抽取遗漏 优化

  1. 增加max_gleanings(默认2)
  2. 减小chunk大小(确保单个chunk不超过1024 tokens)