RAGFlow-05-GraphRAG模块
模块概览
1.1 职责与定位
GraphRAG 模块是 RAGFlow 的图谱增强检索引擎,通过构建知识图谱增强 RAG 的语义理解能力。主要职责包括:
- 实体识别与抽取:从文档中识别实体(人名、地名、组织、概念)
- 关系抽取:识别实体间的关系(同义、上下位、因果、时序)
- 实体消歧(Entity Resolution):合并同一实体的不同表述
- 图谱构建:将实体与关系存储为图结构(NetworkX)
- 图谱检索:基于图谱结构扩展查询,提升召回
- 社区检测:Leiden 算法划分实体社区,生成层级摘要
1.2 两种模式
Light 模式(轻量级):
- 基于关键词与词向量的简单图谱
- 不使用 LLM,成本低、速度快
- 适合场景:小规模文档、预算有限
General 模式(完整版):
- 使用 LLM 进行实体识别、关系抽取、实体消歧
- 支持社区检测与层级摘要
- 适合场景:大规模文档、高精度要求
1. 整体服务架构图
1.1 模块层级架构
flowchart TB
subgraph "API层"
API["/api/v1/kb/run_graphrag<br/>kb_app.py::run_graphrag()"]
end
subgraph "服务层"
KBSvc[KnowledgebaseService<br/>管理知识库]
TaskSvc[TaskService<br/>管理任务]
DocSvc[DocumentService<br/>管理文档]
end
subgraph "任务队列层"
RedisQueue[Redis Stream<br/>ragflow_svr队列]
TaskExec[TaskExecutor<br/>task_executor.py]
end
subgraph "GraphRAG核心层"
Index[graphrag.general.index<br/>run_graphrag_for_kb()]
Extractor[GraphExtractor<br/>实体关系抽取]
Resolution[EntityResolution<br/>实体消歧]
Community[CommunityReportsExtractor<br/>社区检测与摘要]
Search[KGSearch<br/>图谱检索]
end
subgraph "存储层"
MySQL[(MySQL<br/>任务/文档元数据)]
ES[(Elasticsearch<br/>图谱数据/Chunks)]
Redis[(Redis<br/>分布式锁/缓存)]
end
subgraph "外部服务"
LLM[LLM Service<br/>实体识别/关系抽取/摘要]
Emb[Embedding Service<br/>向量化/相似度计算]
end
API --> KBSvc
API --> TaskSvc
API --> DocSvc
TaskSvc --> RedisQueue
RedisQueue --> TaskExec
TaskExec --> Index
Index --> Extractor
Index --> Resolution
Index --> Community
Extractor --> LLM
Extractor --> Emb
Resolution --> LLM
Resolution --> Emb
Community --> LLM
Index --> ES
Index --> Redis
Search --> ES
KBSvc --> MySQL
TaskSvc --> MySQL
DocSvc --> MySQL
架构说明:
- API层:接收用户请求,校验权限,创建任务
- 服务层:管理知识库、任务、文档的业务逻辑
- 任务队列层:异步任务调度,解耦请求与执行
- GraphRAG核心层:图谱构建的核心算法实现
- 存储层:持久化任务状态、图谱数据、向量索引
- 外部服务:LLM和Embedding模型调用
1.2 数据流架构图
flowchart LR
subgraph "输入"
Docs[知识库文档]
end
subgraph "分块处理"
Chunks[Chunks<br/>content_with_weight]
end
subgraph "模式选择"
LightMode[Light模式<br/>关键词+共现]
GeneralMode[General模式<br/>LLM驱动]
end
subgraph "子图生成"
SubGraph1[Doc1 SubGraph]
SubGraph2[Doc2 SubGraph]
SubGraphN[DocN SubGraph]
end
subgraph "图谱合并"
MergeGraph[全局图谱<br/>NetworkX Graph]
PageRank[PageRank计算]
end
subgraph "后处理"
EntityRes[实体消歧<br/>合并同义实体]
CommDetect[社区检测<br/>Leiden算法]
CommReport[社区摘要<br/>LLM生成]
end
subgraph "存储"
ESEntity[(ES: entity节点)]
ESRelation[(ES: relation边)]
ESCommunity[(ES: community_report)]
end
subgraph "检索"
Query[用户查询]
KGRetrieval[图谱检索<br/>实体+关系+社区]
EnhancedResult[增强结果]
end
Docs --> Chunks
Chunks --> LightMode
Chunks --> GeneralMode
LightMode --> SubGraph1
GeneralMode --> SubGraph1
GeneralMode --> SubGraph2
GeneralMode --> SubGraphN
SubGraph1 --> MergeGraph
SubGraph2 --> MergeGraph
SubGraphN --> MergeGraph
MergeGraph --> PageRank
PageRank --> EntityRes
EntityRes --> CommDetect
CommDetect --> CommReport
EntityRes --> ESEntity
EntityRes --> ESRelation
CommReport --> ESCommunity
Query --> KGRetrieval
ESEntity --> KGRetrieval
ESRelation --> KGRetrieval
ESCommunity --> KGRetrieval
KGRetrieval --> EnhancedResult
数据流说明:
- 输入阶段:文档分块为Chunks
- 模式选择:根据配置选择Light或General模式
- 并行处理:每个文档并行生成子图
- 图谱合并:子图合并为全局图谱并计算PageRank
- 后处理:实体消歧、社区检测、生成摘要
- 存储索引:图谱数据存入Elasticsearch
- 检索增强:查询时从图谱中召回相关内容
2. 完整调用链路与时序图
2.1 GraphRAG 构建完整时序图(自上而下)
sequenceDiagram
autonumber
participant User as 用户/前端
participant API as kb_app.py<br/>run_graphrag()
participant KBSvc as KnowledgebaseService
participant TaskSvc as TaskService
participant Redis as Redis Stream
participant TaskExec as TaskExecutor<br/>do_handle_task()
participant Index as graphrag.index<br/>run_graphrag_for_kb()
participant Extractor as GraphExtractor<br/>实体关系抽取
participant LLM as LLM Service
participant Embed as Embedding Service
participant ES as Elasticsearch
participant Lock as Redis分布式锁
participant Resolution as EntityResolution
participant Community as CommunityReports
User->>API: POST /api/v1/kb/run_graphrag<br/>{kb_id: "xxx"}
rect rgb(240, 248, 255)
Note over API,KBSvc: API层:请求校验与任务创建
API->>KBSvc: get_by_id(kb_id)
KBSvc-->>API: 返回 KB 对象
API->>API: 校验权限<br/>check_kb_team_permission()
API->>TaskSvc: 检查是否有正在运行的任务<br/>get_by_id(graphrag_task_id)
TaskSvc-->>API: 任务状态
API->>TaskSvc: queue_raptor_o_graphrag_tasks()<br/>创建GraphRAG任务
TaskSvc->>Redis: XADD ragflow_svr<br/>{task_type: "graphrag", doc_ids: [...]}
Redis-->>TaskSvc: 返回 task_id
TaskSvc-->>API: task_id
API-->>User: 202 Accepted<br/>{graphrag_task_id: "xxx"}
end
Note over User,Community: === 异步分界线 ===
rect rgb(255, 250, 240)
Note over TaskExec,Index: 任务执行层:从队列获取并执行
TaskExec->>Redis: XREADGROUP ragflow_group<br/>BLOCK 5000 COUNT 1
Redis-->>TaskExec: 返回 graphrag 任务
TaskExec->>TaskExec: collect() 获取任务详情
TaskExec->>Index: run_graphrag_for_kb()<br/>(doc_ids, with_resolution, with_community)
end
rect rgb(240, 255, 240)
Note over Index,ES: 核心层1:子图生成(并行)
loop 每个文档(最多4个并行)
Index->>ES: chunk_list(doc_id)<br/>获取文档chunks
ES-->>Index: 返回chunks列表
Index->>Index: generate_subgraph(doc_id, chunks)
Index->>Extractor: GraphExtractor()(doc_id, chunks)
loop 每个chunk(最多10个并行)
Extractor->>LLM: 调用LLM提取实体和关系<br/>GRAPH_EXTRACTION_PROMPT
LLM-->>Extractor: 返回实体关系三元组<br/>(entity, relation, entity)
Extractor->>LLM: 多次gleaning确保完整性<br/>CONTINUE_PROMPT (最多2次)
LLM-->>Extractor: 补充实体关系
end
Extractor->>Extractor: _merge_nodes()<br/>合并同名实体
Extractor->>Extractor: _merge_edges()<br/>合并同关系边
Extractor-->>Index: 返回 (entities, relations)
Index->>Index: 构建NetworkX子图<br/>subgraph.add_node/add_edge
Index->>ES: 保存子图到ES<br/>knowledge_graph_kwd="subgraph"
end
end
rect rgb(255, 245, 240)
Note over Index,Lock: 核心层2:图谱合并(全局锁)
Index->>Lock: spin_acquire("graphrag_task_{kb_id}")
Lock-->>Index: 获取分布式锁
loop 每个子图
Index->>Index: merge_subgraph(subgraph)
Index->>ES: 获取旧图<br/>get_graph(kb_id)
ES-->>Index: 返回旧图(如果存在)
Index->>Index: graph_merge(old_graph, subgraph)
Index->>Index: PageRank计算<br/>nx.pagerank(new_graph)
Index->>ES: set_graph(new_graph)<br/>保存节点和边到ES
end
Index->>Lock: release()
end
rect rgb(250, 240, 255)
Note over Index,Community: 核心层3:实体消歧(可选)
alt with_resolution == True
Index->>Lock: spin_acquire("graphrag_task_{kb_id}")
Lock-->>Index: 获取锁
Index->>Resolution: EntityResolution()(graph, subgraph_nodes)
loop 批量处理候选实体对(100个/批)
Resolution->>Resolution: is_similarity(entity_a, entity_b)<br/>编辑距离判断
Resolution->>LLM: 调用LLM判断是否同一实体<br/>ENTITY_RESOLUTION_PROMPT
LLM-->>Resolution: 返回 Yes/No
Resolution->>Resolution: 收集需要合并的实体对
end
Resolution->>Resolution: _merge_graph_nodes()<br/>合并同义实体
Resolution->>Resolution: 重新计算PageRank
Resolution-->>Index: 返回消歧后的图谱
Index->>ES: set_graph(resolved_graph)
Index->>Lock: release()
end
end
rect rgb(255, 240, 245)
Note over Index,Community: 核心层4:社区检测(可选)
alt with_community == True
Index->>Lock: spin_acquire("graphrag_task_{kb_id}")
Lock-->>Index: 获取锁
Index->>Community: CommunityReportsExtractor()(graph)
Community->>Community: leiden.run(graph)<br/>Leiden算法社区检测
loop 每个社区(并行)
Community->>Community: 收集社区内实体和关系
Community->>LLM: 生成社区摘要<br/>COMMUNITY_REPORT_PROMPT
LLM-->>Community: 返回摘要报告<br/>{title, summary, findings, rating}
Community->>ES: 保存社区报告<br/>knowledge_graph_kwd="community_report"
end
Community-->>Index: 返回社区报告列表
Index->>Lock: release()
end
end
Index-->>TaskExec: 返回执行结果<br/>{ok_docs, failed_docs, seconds}
TaskExec->>TaskExec: set_progress(task_id, prog=1.0)
TaskExec->>Redis: ACK消息
Note over User: 用户可通过 /api/v1/kb/trace_graphrag 查询进度
时序图说明:
-
API层(步骤1-11):用户发起请求 → 校验权限 → 创建任务 → 返回task_id
- 关键代码:
api/apps/kb_app.py::run_graphrag() - 边界条件:检查是否有正在运行的任务,避免重复执行
- 异常处理:无效KB ID、权限不足返回错误
- 关键代码:
-
任务队列层(步骤12-14):异步从Redis队列获取任务
- 关键代码:
rag/svr/task_executor.py::collect()→handle_task()→do_handle_task() - 并发控制:消费者组 + ACK机制,支持多worker
- 超时设置:单个任务最多3小时(
@timeout(60*60*3))
- 关键代码:
-
子图生成(步骤15-28):并行处理每个文档
- 关键代码:
graphrag/general/index.py::generate_subgraph() - 并发度:最多4个文档并行,每个文档内最多10个chunk并行
- LLM调用:每个chunk调用1次主提取 + 最多2次gleaning
- Token优化:chunk内容限制1024 tokens
- 幂等性:通过ES检查
does_graph_contains(doc_id)避免重复
- 关键代码:
-
图谱合并(步骤29-38):全局锁保证一致性
- 关键代码:
graphrag/general/index.py::merge_subgraph() - 分布式锁:Redis锁
graphrag_task_{kb_id},超时1200秒 - PageRank:合并后重新计算全图的PageRank值
- 增量更新:
GraphChange记录新增/删除/更新的节点和边
- 关键代码:
-
实体消歧(步骤39-50,可选):合并同义实体
- 关键代码:
graphrag/entity_resolution.py::EntityResolution() - 候选筛选:编辑距离 < len/2(英文)或集合交集 >= 0.8(中文)
- 批处理:100对/批,最多5个批次并行
- LLM判断:仅对高相似度候选调用LLM确认
- 超时策略:单批最多280秒,超时则跳过
- 关键代码:
-
社区检测(步骤51-62,可选):生成层级摘要
- 关键代码:
graphrag/general/community_reports_extractor.py::CommunityReportsExtractor() - Leiden算法:基于模块度的社区划分
- 并行摘要:每个社区独立调用LLM生成摘要
- 存储:社区报告独立存储,用于查询时召回
- 关键代码:
2.2 GraphRAG 检索时序图
sequenceDiagram
autonumber
participant User as 用户查询
participant Chat as ChatAPI
participant Search as KGSearch<br/>graphrag.search
participant LLM as LLM Service
participant Embed as Embedding Service
participant ES as Elasticsearch
User->>Chat: 发起对话<br/>"RAGFlow如何工作?"
Chat->>Search: retrieval(question, kb_ids, max_token=8196)
rect rgb(240, 248, 255)
Note over Search,LLM: 步骤1:查询改写
Search->>ES: get_entity_type2samples(kb_ids)<br/>获取实体类型样本
ES-->>Search: {Person: ["张三", "李四"], Product: ["RAGFlow"]}
Search->>LLM: query_rewrite(question, type_samples)<br/>MINIRAG_QUERY2KWD_PROMPT
LLM-->>Search: {answer_type_keywords: ["Product"],<br/>entities_from_query: ["RAGFlow"]}
end
rect rgb(240, 255, 240)
Note over Search,ES: 步骤2:实体召回(3路并行)
par 路径1:关键词匹配实体
Search->>Embed: encode(entities_from_query)
Embed-->>Search: 查询向量
Search->>ES: 向量检索<br/>knowledge_graph_kwd="entity"<br/>top 56
ES-->>Search: ents_from_query<br/>{entity_kwd, rank_flt, n_hop_with_weight}
and 路径2:类型过滤实体
Search->>ES: 精确匹配<br/>entity_type_kwd=answer_type_keywords<br/>ORDER BY rank_flt DESC
ES-->>Search: ents_from_types<br/>高PageRank实体
and 路径3:关系召回
Search->>Embed: encode(question)
Embed-->>Search: 查询向量
Search->>ES: 向量检索<br/>knowledge_graph_kwd="relation"<br/>top 56
ES-->>Search: rels_from_txt<br/>{from_entity, to_entity, description}
end
end
rect rgb(255, 250, 240)
Note over Search,ES: 步骤3:N-hop路径扩展
Search->>Search: 解析n_hop_with_weight<br/>提取多跳路径
loop 每个路径
Search->>Search: 计算路径得分<br/>sim / (2 + hop_distance)
Search->>Search: 累加边的pagerank权重
end
end
rect rgb(250, 240, 255)
Note over Search,ES: 步骤4:融合排序
Search->>Search: 实体得分融合<br/>P(E|Q) = sim * pagerank * type_boost
Search->>Search: 关系得分融合<br/>sim * pagerank * (1 + type_boost)
Search->>Search: Top-K选择<br/>entities: 6, relations: 6
end
rect rgb(255, 245, 240)
Note over Search,ES: 步骤5:社区召回
Search->>ES: 查询相关社区<br/>entities_kwd IN top_entities<br/>ORDER BY weight_flt DESC<br/>top 1
ES-->>Search: community_reports
end
rect rgb(245, 255, 245)
Note over Search,Chat: 步骤6:构建上下文
Search->>Search: 格式化实体表格<br/>Entity, Score, Description
Search->>Search: 格式化关系表格<br/>From Entity, To Entity, Description
Search->>Search: 格式化社区报告<br/>Title, Summary, Findings
Search-->>Chat: 返回增强上下文<br/>{content_with_weight: "..."}
end
Chat->>LLM: 调用LLM生成回答<br/>context + question
LLM-->>Chat: 生成答案
Chat-->>User: 返回回答
检索时序图说明:
-
查询改写(步骤1-4):
- 从图谱中采样实体类型(每类3个示例)
- 使用LLM识别查询中的实体和期望答案类型
- Prompt:
MINIRAG_QUERY2KWD_PROMPT - 输出:
answer_type_keywords(如Person/Product),entities_from_query(如"RAGFlow")
-
实体召回(步骤5-14,3路并行):
- 路径1:向量检索 + 关键词匹配,召回相似实体(top 56)
- 路径2:类型过滤 + PageRank排序,召回高权重实体
- 路径3:向量检索关系描述,召回相关关系(top 56)
- 字段:
entity_kwd,rank_flt(PageRank),n_hop_with_weight(邻居路径)
-
N-hop路径扩展(步骤15-19):
- 解析
n_hop_with_weight:[{path: [A, B, C], weights: [0.8, 0.6]}] - 计算路径得分:
sim / (2 + hop_distance)—— 距离越远衰减越大 - 将N-hop关系补充到
rels_from_txt
- 解析
-
融合排序(步骤20-23):
- 实体得分:
sim * pagerank * type_boost- 如果实体类型匹配
answer_type_keywords,则type_boost = 2
- 如果实体类型匹配
- 关系得分:
sim * pagerank * (1 + type_boost)- 如果关系两端实体类型匹配,则增加boost
- Top-K: entities取前6,relations取前6
- 实体得分:
-
社区召回(步骤24-25):
- 查询条件:
entities_kwd IN top_entities - 排序:
weight_flt DESC(社区权重 = 社区内实体度数和) - 限制:top 1(最相关的社区报告)
- 查询条件:
-
构建上下文(步骤26-29):
- 格式化为CSV表格(Pandas DataFrame)
- Token限制:
max_token = 8196,超出则截断 - 返回结构:
{ "content_with_weight": """ ---- Entities ---- Entity,Score,Description RAGFlow,0.95,"开源RAG引擎" ---- Relations ---- From Entity,To Entity,Score,Description RAGFlow,LLM,0.87,"RAGFlow使用LLM生成答案" ---- Community Report ---- # RAG技术栈 ## Content 该社区包含RAG相关技术... """, "docnm_kwd": "Related content in Knowledge Graph", "similarity": 1.0 }
性能优化点:
- 并行召回:3路召回并行执行,降低延迟
- 向量缓存:LLM Cache机制缓存重复查询
- 得分融合:结合语义相似度(sim)和图结构重要性(pagerank)
- Token预算:动态截断,确保不超过上下文窗口
3. 核心模块调用链路详解
3.1 API层入口(api/apps/kb_app.py)
3.1.1 run_graphrag() 函数
# 文件:api/apps/kb_app.py
@manager.route("/run_graphrag", methods=["POST"])
@login_required
def run_graphrag():
"""
GraphRAG构建入口API
请求:POST /api/v1/kb/run_graphrag
参数:{"kb_id": "xxx"}
返回:{"graphrag_task_id": "xxx"}
"""
req = request.json
kb_id = req.get("kb_id", "")
# 1. 校验知识库是否存在
ok, kb = KnowledgebaseService.get_by_id(kb_id)
if not ok:
return get_error_data_result(message="Invalid Knowledgebase ID")
# 2. 检查是否有正在运行的任务(防止重复执行)
task_id = kb.graphrag_task_id
if task_id:
ok, task = TaskService.get_by_id(task_id)
if task and task.progress not in [-1, 1]: # -1=失败, 1=成功
return get_error_data_result(
message=f"Task {task_id} in progress. A Graph Task is already running."
)
# 3. 获取知识库下所有文档
documents, _ = DocumentService.get_by_kb_id(
kb_id=kb_id,
page_number=0,
items_per_page=0, # 0表示获取全部
orderby="create_time",
desc=False,
)
if not documents:
return get_error_data_result(message=f"No documents in Knowledgebase {kb_id}")
# 4. 创建异步任务
sample_document = documents[0]
document_ids = [document["id"] for document in documents]
task_id = queue_raptor_o_graphrag_tasks(
doc=sample_document,
ty="graphrag",
priority=0,
fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, # 占位ID,避免与普通文档任务冲突
doc_ids=list(document_ids)
)
# 5. 更新知识库的graphrag_task_id字段
KnowledgebaseService.update_by_id(kb.id, {"graphrag_task_id": task_id})
return get_json_result(data={"graphrag_task_id": task_id})
关键点:
- 幂等性检查:通过
kb.graphrag_task_id和任务进度判断是否有正在运行的任务 - 异步解耦:立即返回
task_id,任务在后台执行 - 全量文档:一次性处理知识库下所有文档
3.2 任务队列层(api/db/services/document_service.py)
3.2.1 queue_raptor_o_graphrag_tasks() 函数
# 文件:api/db/services/document_service.py
def queue_raptor_o_graphrag_tasks(doc, ty, priority, fake_doc_id="", doc_ids=[]):
"""
创建GraphRAG/Raptor任务并推入Redis队列
参数:
doc: 文档对象(用于获取配置)
ty: 任务类型("graphrag" | "raptor")
priority: 优先级(0-9)
fake_doc_id: 占位文档ID(用于知识库级别的任务)
doc_ids: 需要处理的文档ID列表
返回:
task_id: 任务ID
"""
# 1. 获取分块配置(用于计算任务摘要)
chunking_config = DocumentService.get_chunking_config(doc["id"])
hasher = xxhash.xxh64()
for field in sorted(chunking_config.keys()):
hasher.update(str(chunking_config[field]).encode("utf-8"))
# 2. 创建任务对象
task = {
"id": get_uuid(),
"doc_id": fake_doc_id if fake_doc_id else doc["id"],
"from_page": 100000000, # 占位值
"to_page": 100000000,
"task_type": ty,
"progress_msg": datetime.now().strftime("%H:%M:%S") + " created task " + ty,
"begin_at": datetime.now(),
}
# 3. 计算任务摘要(用于去重)
for field in ["doc_id", "from_page", "to_page"]:
hasher.update(str(task.get(field, "")).encode("utf-8"))
hasher.update(ty.encode("utf-8"))
task["digest"] = hasher.hexdigest()
# 4. 持久化到MySQL
bulk_insert_into_db(Task, [task], True) # True表示INSERT IGNORE
# 5. 推入Redis队列
if ty in ["graphrag", "raptor", "mindmap"]:
task["doc_ids"] = doc_ids # 携带文档ID列表
DocumentService.begin2parse(doc["id"]) # 更新文档状态为"解析中"
assert REDIS_CONN.queue_product(
get_svr_queue_name(priority), # 队列名:ragflow_svr_0 ~ ragflow_svr_9
message=task
), "Can't access Redis. Please check the Redis' status."
return task["id"]
关键点:
- 任务摘要(digest):基于配置和参数计算哈希,用于去重
- INSERT IGNORE:避免重复创建相同任务
- Redis Stream:使用XADD命令推入消息队列
3.3 任务执行层(rag/svr/task_executor.py)
3.3.1 collect() 和 handle_task() 函数
# 文件:rag/svr/task_executor.py
async def collect():
"""从Redis队列获取任务"""
global UNACKED_ITERATOR
svr_queue_names = get_svr_queue_names() # [ragflow_svr_0, ..., ragflow_svr_9]
# 1. 先处理未ACK的消息(重试机制)
if not UNACKED_ITERATOR:
UNACKED_ITERATOR = REDIS_CONN.get_unacked_iterator(
svr_queue_names,
SVR_CONSUMER_GROUP_NAME, # ragflow_group
CONSUMER_NAME # 当前worker的唯一名称
)
try:
redis_msg = next(UNACKED_ITERATOR)
except StopIteration:
# 2. 没有未ACK的消息,获取新消息
for svr_queue_name in svr_queue_names:
redis_msg = REDIS_CONN.queue_consumer(
svr_queue_name,
SVR_CONSUMER_GROUP_NAME,
CONSUMER_NAME
)
if redis_msg:
break
if not redis_msg:
return None, None
msg = redis_msg.get_message()
# 3. 特殊任务处理(graphrag/raptor/mindmap)
if msg.get("doc_id", "") in [GRAPH_RAPTOR_FAKE_DOC_ID, CANVAS_DEBUG_DOC_ID]:
task = msg
if task["task_type"] in ["graphrag", "raptor", "mindmap"] and msg.get("doc_ids", []):
task = TaskService.get_task(msg["id"], msg["doc_ids"])
task["doc_ids"] = msg["doc_ids"]
else:
task = TaskService.get_task(msg["id"])
# 4. 检查任务是否已取消
if not task or has_canceled(task["id"]):
redis_msg.ack() # 确认消息
return None, None
return redis_msg, task
async def handle_task():
"""处理单个任务"""
global DONE_TASKS, FAILED_TASKS
redis_msg, task = await collect()
if not task:
await trio.sleep(5)
return
task_type = task["task_type"]
try:
logging.info(f"handle_task begin for task {json.dumps(task)}")
CURRENT_TASKS[task["id"]] = copy.deepcopy(task)
# 调用具体的任务处理函数
await do_handle_task(task)
DONE_TASKS += 1
CURRENT_TASKS.pop(task["id"], None)
logging.info(f"handle_task done for task {json.dumps(task)}")
except Exception as e:
FAILED_TASKS += 1
CURRENT_TASKS.pop(task["id"], None)
# 记录错误信息
set_progress(task["id"], prog=-1, msg=f"[Exception]: {str(e)}")
logging.exception(f"handle_task got exception for task {json.dumps(task)}")
finally:
# 记录任务日志
if task_type in ["graphrag", "raptor", "mindmap"]:
task_document_ids = task["doc_ids"]
PipelineOperationLogService.record_pipeline_operation(
document_id=task["doc_id"],
pipeline_id="",
task_type=TASK_TYPE_TO_PIPELINE_TASK_TYPE.get(task_type),
fake_document_ids=task_document_ids
)
redis_msg.ack() # 确认消息
关键点:
- 消费者组(Consumer Group):支持多worker并行,自动负载均衡
- 未ACK重试:处理失败的任务会重新投递
- 全局任务追踪:
CURRENT_TASKS记录正在执行的任务 - 异常容错:捕获异常后记录到任务进度,不影响其他任务
3.3.2 do_handle_task() - GraphRAG分支
# 文件:rag/svr/task_executor.py
@timeout(60*60*3, 1) # 超时3小时
async def do_handle_task(task):
task_type = task.get("task_type", "")
# ... 省略其他任务类型 ...
if task_type == "graphrag":
start_ts = timer()
# 1. 获取知识库配置
kb_id = task["kb_id"]
_, kb = KnowledgebaseService.get_by_id(kb_id)
kb_parser_config = kb.parser_config or {}
graphrag_conf = kb_parser_config.get("graphrag", {})
# 2. 确定语言
task_language = graphrag_conf.get("language", "English")
# 3. 获取LLM和Embedding模型
chat_model = LLMBundle(task["tenant_id"], LLMType.CHAT, kb.llm_id)
embedding_model = LLMBundle(task["tenant_id"], LLMType.EMBEDDING, kb.embd_id)
# 4. 进度回调函数
def progress_callback(prog=None, msg=""):
set_progress(task["id"], prog=prog, msg=msg)
# 5. 读取配置项
with_resolution = graphrag_conf.get("resolution", True) # 是否实体消歧
with_community = graphrag_conf.get("community", False) # 是否社区检测
# 6. 调用GraphRAG核心函数
async with kg_limiter: # 限流:最多并发kg_limiter个GraphRAG任务
result = await run_graphrag_for_kb(
row=task,
doc_ids=task.get("doc_ids", []),
language=task_language,
kb_parser_config=kb_parser_config,
chat_model=chat_model,
embedding_model=embedding_model,
callback=progress_callback,
with_resolution=with_resolution,
with_community=with_community,
)
logging.info(f"GraphRAG task result:\n{result}")
progress_callback(prog=1.0, msg=f"Knowledge Graph done ({timer() - start_ts:.2f}s)")
return
关键点:
- 超时保护:
@timeout(60*60*3)- 3小时超时 - 并发限流:
kg_limiter控制同时执行的GraphRAG任务数 - 进度回调:实时更新任务进度,用户可通过API查询
- 配置驱动:
with_resolution和with_community可配置
3.4 GraphRAG核心层(graphrag/general/index.py)
3.4.1 run_graphrag_for_kb() 主函数
# 文件:graphrag/general/index.py
async def run_graphrag_for_kb(
row: dict,
doc_ids: list[str],
language: str,
kb_parser_config: dict,
chat_model,
embedding_model,
callback,
*,
with_resolution: bool = True,
with_community: bool = True,
max_parallel_docs: int = 4,
) -> dict:
"""
知识库级别的GraphRAG构建
参数:
row: 任务对象,包含tenant_id, kb_id等
doc_ids: 需要处理的文档ID列表
language: 语言(English/Chinese)
kb_parser_config: 解析配置
chat_model/embedding_model: LLM和Embedding服务
callback: 进度回调
with_resolution: 是否实体消歧
with_community: 是否社区检测
max_parallel_docs: 最大并行文档数
返回:
{"ok_docs": [...], "failed_docs": [...], "total_docs": N, "total_chunks": M, "seconds": T}
"""
tenant_id, kb_id = row["tenant_id"], row["kb_id"]
start = trio.current_time()
# 1. 如果未指定doc_ids,则获取知识库下所有文档
if not doc_ids:
docs, _ = DocumentService.get_by_kb_id(kb_id=kb_id, ...)
doc_ids = [doc["id"] for doc in docs]
doc_ids = list(dict.fromkeys(doc_ids)) # 去重
if not doc_ids:
callback(msg=f"[GraphRAG] kb:{kb_id} has no processable doc_id.")
return {"ok_docs": [], "failed_docs": [], "total_docs": 0, ...}
# 2. 加载所有文档的chunks(合并小chunk以节省LLM调用)
def load_doc_chunks(doc_id: str) -> list[str]:
chunks = []
current_chunk = ""
for d in settings.retriever.chunk_list(doc_id, tenant_id, [kb_id], ...):
content = d["content_with_weight"]
if num_tokens_from_string(current_chunk + content) < 1024:
current_chunk += content # 合并
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = content
if current_chunk:
chunks.append(current_chunk)
return chunks
all_doc_chunks: dict[str, list[str]] = {}
total_chunks = 0
for doc_id in doc_ids:
chunks = load_doc_chunks(doc_id)
all_doc_chunks[doc_id] = chunks
total_chunks += len(chunks)
# 3. 并行生成子图(每个文档一个子图)
semaphore = trio.Semaphore(max_parallel_docs) # 最多4个文档并行
subgraphs: dict[str, nx.Graph] = {}
failed_docs: list[tuple[str, str]] = []
async def build_one(doc_id: str):
"""构建单个文档的子图"""
chunks = all_doc_chunks.get(doc_id, [])
if not chunks:
return
# 选择Light或General模式
kg_extractor = (
LightKGExt
if kb_parser_config.get("graphrag", {}).get("method") != "general"
else GeneralKGExt
)
async with semaphore:
try:
deadline = max(120, len(chunks) * 60 * 10) # 超时时间:120s ~ chunks*10min
callback(msg=f"[GraphRAG] build_subgraph doc:{doc_id} start (chunks={len(chunks)})")
with trio.fail_after(deadline):
sg = await generate_subgraph(
kg_extractor,
tenant_id,
kb_id,
doc_id,
chunks,
language,
kb_parser_config.get("graphrag", {}).get("entity_types", []),
chat_model,
embedding_model,
callback,
)
if sg:
subgraphs[doc_id] = sg
callback(msg=f"[GraphRAG] build_subgraph doc:{doc_id} done")
else:
failed_docs.append((doc_id, "subgraph is empty"))
except Exception as e:
failed_docs.append((doc_id, repr(e)))
callback(msg=f"[GraphRAG] build_subgraph doc:{doc_id} FAILED: {e!r}")
# trio.open_nursery(): 并行执行多个异步任务
async with trio.open_nursery() as nursery:
for doc_id in doc_ids:
nursery.start_soon(build_one, doc_id)
ok_docs = [d for d in doc_ids if d in subgraphs]
if not ok_docs:
callback(msg=f"[GraphRAG] kb:{kb_id} no subgraphs generated, end.")
return {"ok_docs": [], "failed_docs": failed_docs, ...}
# 4. 合并子图到全局图谱(串行,加锁)
kb_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value="batch_merge", timeout=1200)
await kb_lock.spin_acquire()
callback(msg=f"[GraphRAG] kb:{kb_id} merge lock acquired")
try:
final_graph = None
for doc_id in ok_docs:
sg = subgraphs[doc_id]
new_graph = await merge_subgraph(
tenant_id, kb_id, doc_id, sg, embedding_model, callback
)
if new_graph is not None:
final_graph = new_graph # 保留最新的图谱引用
callback(msg=f"[GraphRAG] kb:{kb_id} merge finished, graph ready.")
finally:
kb_lock.release()
# 5. 实体消歧(可选)
if with_resolution and final_graph:
await kb_lock.spin_acquire()
subgraph_nodes = set()
for sg in subgraphs.values():
subgraph_nodes.update(set(sg.nodes()))
await resolve_entities(
final_graph, subgraph_nodes, tenant_id, kb_id, None,
chat_model, embedding_model, callback
)
kb_lock.release()
# 6. 社区检测(可选)
if with_community and final_graph:
await kb_lock.spin_acquire()
await extract_community(
final_graph, tenant_id, kb_id, None,
chat_model, embedding_model, callback
)
kb_lock.release()
now = trio.current_time()
callback(msg=f"[GraphRAG] GraphRAG for KB {kb_id} done in {now - start:.2f}s.")
return {
"ok_docs": ok_docs,
"failed_docs": failed_docs,
"total_docs": len(doc_ids),
"total_chunks": total_chunks,
"seconds": now - start,
}
关键点:
- Chunk合并:将小chunk合并到1024 tokens以内,减少LLM调用次数
- 并行子图生成:
trio.Semaphore(4)控制最多4个文档并行 - 超时策略:每个文档的超时时间 =
max(120, chunks数 * 10分钟) - 分布式锁:合并、消歧、社区检测阶段加全局锁,保证一致性
- 容错机制:单个文档失败不影响其他文档
3.5 实体关系抽取层(graphrag/general/graph_extractor.py)
3.5.1 GraphExtractor 核心逻辑
# 文件:graphrag/general/graph_extractor.py
class GraphExtractor(Extractor):
"""General模式的图谱抽取器(使用LLM)"""
async def __call__(self, doc_id: str, chunks: list[str], callback: Callable | None = None):
"""
从chunks中抽取实体和关系
参数:
doc_id: 文档ID
chunks: 文档内容分块(已合并到1024 tokens以内)
callback: 进度回调
返回:
(all_entities_data, all_relationships_data)
"""
self.callback = callback
start_ts = trio.current_time()
# 1. 并行处理所有chunks(最多10个并行)
async def extract_all(doc_id, chunks, max_concurrency=10):
out_results = []
limiter = trio.Semaphore(max_concurrency)
async def worker(chunk_key_dp: tuple[str, str], idx: int, total: int):
async with limiter:
await self._process_single_content(chunk_key_dp, idx, total, out_results)
async with trio.open_nursery() as nursery:
for i, ck in enumerate(chunks):
nursery.start_soon(worker, (doc_id, ck), i, len(chunks))
return out_results
out_results = await extract_all(doc_id, chunks)
# 2. 合并所有chunk的结果
maybe_nodes = defaultdict(list) # {entity_name: [entity_data, ...]}
maybe_edges = defaultdict(list) # {(src, dst): [relation_data, ...]}
sum_token_count = 0
for m_nodes, m_edges, token_count in out_results:
for k, v in m_nodes.items():
maybe_nodes[k].extend(v)
for k, v in m_edges.items():
maybe_edges[tuple(sorted(k))].extend(v)
sum_token_count += token_count
callback(msg=f"Entities extraction done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {sum_token_count} tokens")
# 3. 合并同名实体(并行)
all_entities_data = []
async with trio.open_nursery() as nursery:
for en_nm, ents in maybe_nodes.items():
nursery.start_soon(self._merge_nodes, en_nm, ents, all_entities_data)
# 4. 合并同关系边(并行)
all_relationships_data = []
async with trio.open_nursery() as nursery:
for (src, tgt), rels in maybe_edges.items():
nursery.start_soon(self._merge_edges, src, tgt, rels, all_relationships_data)
return all_entities_data, all_relationships_data
async def _process_single_content(self, chunk_key_dp: tuple[str, str], chunk_seq: int, num_chunks: int, out_results):
"""处理单个chunk,提取实体和关系"""
chunk_key = chunk_key_dp[0] # doc_id
content = chunk_key_dp[1] # chunk文本
token_count = 0
# 1. 构建主提取Prompt
variables = {
**self._prompt_variables, # tuple_delimiter, record_delimiter等
"input_text": content,
}
hint_prompt = perform_variable_replacements(GRAPH_EXTRACTION_PROMPT, variables=variables)
# 2. 第一次LLM调用(主提取)
async with chat_limiter: # 限流:避免LLM并发过高
response = await trio.to_thread.run_sync(
lambda: self._chat(hint_prompt, [{"role": "user", "content": "Output:"}], {})
)
token_count += num_tokens_from_string(hint_prompt + response)
results = response or ""
history = [
{"role": "system", "content": hint_prompt},
{"role": "user", "content": response}
]
# 3. Gleaning:多次迭代确保完整性(最多2次)
for i in range(self._max_gleanings): # _max_gleanings = 2
history.append({"role": "user", "content": CONTINUE_PROMPT})
async with chat_limiter:
response = await trio.to_thread.run_sync(lambda: self._chat("", history, {}))
token_count += num_tokens_from_string(response)
results += response or ""
# 最后一次迭代不需要判断是否继续
if i >= self._max_gleanings - 1:
break
# 4. 询问LLM是否还有遗漏的实体
history.append({"role": "assistant", "content": response})
history.append({"role": "user", "content": LOOP_PROMPT}) # "还有吗?回答YES或NO"
async with chat_limiter:
continuation = await trio.to_thread.run_sync(lambda: self._chat("", history))
token_count += num_tokens_from_string(continuation)
if continuation != "Y": # LLM回答NO,停止gleaning
break
history.append({"role": "assistant", "content": "Y"})
# 5. 解析LLM返回的实体和关系
records = split_string_by_multi_markers(results, ["##", "<|COMPLETE|>"])
rcds = []
for record in records:
match = re.search(r"\((.*)\)", record) # 提取括号内内容
if match:
rcds.append(match.group(1))
maybe_nodes, maybe_edges = self._entities_and_relations(chunk_key, rcds, "<|>")
out_results.append((maybe_nodes, maybe_edges, token_count))
if self.callback:
self.callback(
0.5 + 0.1 * len(out_results) / num_chunks,
msg=f"Entities extraction of chunk {chunk_seq} {len(out_results)}/{num_chunks} done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges"
)
def _entities_and_relations(self, chunk_key: str, records: list, tuple_delimiter: str):
"""解析LLM返回的记录,分离实体和关系"""
maybe_nodes = defaultdict(list)
maybe_edges = defaultdict(list)
ent_types = [t.lower() for t in self._entity_types] # 允许的实体类型
for record in records:
# record示例: "实体<|>RAGFlow<|>Product<|>开源RAG引擎<|>10"
# 或: "关系<|>RAGFlow<|>LLM<|>uses<|>使用LLM生成答案<|>8"
record_attributes = split_string_by_multi_markers(record, [tuple_delimiter])
# 尝试解析为实体
if_entities = handle_single_entity_extraction(record_attributes, chunk_key)
if if_entities and if_entities.get("entity_type", "").lower() in ent_types:
maybe_nodes[if_entities["entity_name"]].append(if_entities)
continue
# 尝试解析为关系
if_relation = handle_single_relationship_extraction(record_attributes, chunk_key)
if if_relation:
maybe_edges[(if_relation["src_id"], if_relation["tgt_id"])].append(if_relation)
return dict(maybe_nodes), dict(maybe_edges)
async def _merge_nodes(self, entity_name: str, entities: list[dict], all_entities_data):
"""合并同名实体"""
if not entities:
return
# 1. 统计实体类型(选择出现最多的)
entity_type = sorted(
Counter([dp["entity_type"] for dp in entities]).items(),
key=lambda x: x[1],
reverse=True
)[0][0]
# 2. 合并描述(用<SEP>分隔)
description = "<SEP>".join(sorted(set([dp["description"] for dp in entities])))
# 3. 收集来源文档
already_source_ids = flat_uniq_list(entities, "source_id")
# 4. 如果描述过多(>12个),调用LLM总结
description = await self._handle_entity_relation_summary(entity_name, description)
node_data = {
"entity_name": entity_name,
"entity_type": entity_type,
"description": description,
"source_id": already_source_ids,
}
all_entities_data.append(node_data)
async def _merge_edges(self, src_id: str, tgt_id: str, edges_data: list[dict], all_relationships_data):
"""合并同关系边"""
if not edges_data:
return
# 1. 累加权重
weight = sum([edge["weight"] for edge in edges_data])
# 2. 合并描述
description = "<SEP>".join(sorted(set([edge["description"] for edge in edges_data])))
description = await self._handle_entity_relation_summary(f"{src_id} -> {tgt_id}", description)
# 3. 合并关键词和来源
keywords = flat_uniq_list(edges_data, "keywords")
source_id = flat_uniq_list(edges_data, "source_id")
edge_data = {
"src_id": src_id,
"tgt_id": tgt_id,
"description": description,
"keywords": keywords,
"weight": weight,
"source_id": source_id
}
all_relationships_data.append(edge_data)
async def _handle_entity_relation_summary(self, entity_or_relation_name: str, description: str) -> str:
"""如果描述过多(>12个),调用LLM总结"""
description_list = description.split("<SEP>")
if len(description_list) <= 12:
return description
# 调用LLM生成摘要
prompt = SUMMARIZE_DESCRIPTIONS_PROMPT.format(
entity_name=entity_or_relation_name,
description_list=description_list,
language=self._language
)
async with chat_limiter:
summary = await trio.to_thread.run_sync(
self._chat, "", [{"role": "user", "content": prompt}]
)
return summary
关键点:
- 并发处理:10个chunk并行,每个chunk独立调用LLM
- Gleaning机制:每个chunk最多3次LLM调用(1次主提取 + 2次补充)
- LLM Cache:
_chat()内置缓存,相同输入直接返回缓存结果 - 描述合并:同名实体的描述用
<SEP>分隔,超过12个则调用LLM总结 - 权重累加:同关系边的权重相加(用于PageRank计算)
Prompt示例(GRAPH_EXTRACTION_PROMPT):
-Goal-
Given a text document that is potentially relevant to this activity and a list of entity types, identify all entities of those types from the text and all relationships among the identified entities.
-Steps-
1. Identify all entities. For each identified entity, extract the following information:
- entity_name: Name of the entity, capitalized
- entity_type: One of the following types: [Person,Organization,Location,Event,Category]
- entity_description: Comprehensive description of the entity's attributes and activities
Format each entity as ("entity"<|>entity_name<|>entity_type<|>entity_description)
2. From the entities identified in step 1, identify all pairs of (source_entity, target_entity) that are *clearly related* to each other.
For each pair of related entities, extract the following information:
- source_entity: name of the source entity, as identified in step 1
- target_entity: name of the target entity, as identified in step 1
- relationship_description: explanation as to why you think the source entity and the target entity are related to each other
- relationship_strength: an integer score between 1 to 10, indicating strength of the relationship between the source entity and target entity
Format each relationship as ("relationship"<|>source_entity<|>target_entity<|>relationship_description<|>relationship_strength)
3. Return output in {language} as a single list of all the entities and relationships identified in steps 1 and 2. Use **##** as the list delimiter.
4. If you have to translate, just translate the descriptions, nothing else!
5. When finished, output <|COMPLETE|>
######################
-Examples-
######################
{examples}
######################
-Real Data-
######################
Entity_types: {entity_types}
Text: {input_text}
######################
Output:
LLM返回示例:
("entity"<|>RAGFlow<|>Product<|>An open-source RAG engine for building AI applications<|>10)##
("entity"<|>LLM<|>Technology<|>Large Language Model used for natural language processing<|>9)##
("entity"<|>Elasticsearch<|>Technology<|>Search and analytics engine<|>8)##
("relationship"<|>RAGFlow<|>LLM<|>RAGFlow uses LLM to generate responses<|>9)##
("relationship"<|>RAGFlow<|>Elasticsearch<|>RAGFlow stores data in Elasticsearch<|>8)##
<|COMPLETE|>
4. 模块交互矩阵
| 调用方 | 被调方 | 调用方式 | 数据流 | 错误语义 | 一致性要求 |
|---|---|---|---|---|---|
| kb_app.run_graphrag() | KnowledgebaseService | 同步函数调用 | KB配置 | 返回错误码 | 读已提交 |
| kb_app.run_graphrag() | queue_raptor_o_graphrag_tasks() | 同步函数调用 | 任务参数 | 抛出异常 | 强一致性(MySQL) |
| TaskExecutor | Redis Stream | 异步队列消费 | 任务消息 | ACK/NACK | 至少一次投递 |
| run_graphrag_for_kb() | generate_subgraph() | 异步并行调用 | chunks → subgraph | 单文档失败不影响其他 | 最终一致性 |
| GraphExtractor | LLM Service | 异步并发调用 | chunk → entities/relations | 重试3次后放弃 | 无状态 |
| merge_subgraph() | Elasticsearch | 异步批量写入 | 节点/边数据 | 抛出异常回滚 | 最终一致性 |
| EntityResolution | Redis分布式锁 | spin_acquire() | 锁状态 | 超时返回失败 | 线性一致性 |
| KGSearch.retrieval() | Elasticsearch | 异步并发查询 | 查询 → 实体/关系/社区 | 降级返回空 | 弱一致性 |
5. 关键设计与权衡
5.1 一致性模型
分布式锁(Redis):
- 使用场景:图谱合并、实体消歧、社区检测
- 实现:
RedisDistributedLockwithspin_acquire() - 超时:1200秒(20分钟)
- 权衡:牺牲可用性换取一致性,避免并发修改导致的图谱冲突
最终一致性(Elasticsearch):
- 使用场景:子图存储、实体/关系/社区索引
- 实现:批量写入(bulk_size=4)
- 权衡:允许短暂不一致,提升写入吞吐量
幂等性设计:
- 任务去重:通过
digest(xxhash)避免重复任务 - 子图覆盖:通过
does_graph_contains(doc_id)检查,存在则跳过 - ACK机制:Redis Stream确保消息至少被处理一次
5.2 并发控制
层级并发:
- 知识库级别:最多1个GraphRAG任务(通过KB锁)
- 文档级别:最多4个文档并行(
Semaphore(4)) - Chunk级别:最多10个chunk并行(
Semaphore(10)) - LLM调用:全局限流
chat_limiter(避免LLM服务过载)
超时策略:
- 任务级别:3小时(
@timeout(60*60*3)) - 文档级别:
max(120, chunks数 * 10分钟) - LLM调用:20分钟(
@timeout(60*20)) - 社区摘要:120秒/社区(
@timeout(120))
5.3 性能关键路径
瓶颈分析:
- LLM调用:占总时间 > 90%
- 优化:Chunk合并(减少调用次数)、并发调用、LLM Cache
- 图谱合并:全局锁串行执行
- 优化:增量合并(仅更新变更的节点/边)
- Elasticsearch写入:批量写入
- 优化:bulk_size=4,避免单次写入过大
Token优化:
- Chunk合并:小chunk合并到1024 tokens以内
- 描述总结:超过12个描述调用LLM总结(避免context过长)
- 上下文截断:检索时
max_token=8196,超出则截断
5.4 配置项
| 配置项 | 默认值 | 说明 |
|---|---|---|
method |
"light" |
模式选择:"light" 或 "general" |
resolution |
True |
是否实体消歧 |
community |
False |
是否社区检测 |
language |
"English" |
语言(影响Prompt) |
entity_types |
`[“Person”,“Organization”,…] | 实体类型白名单 |
max_gleanings |
2 |
Gleaning最大次数 |
max_parallel_docs |
4 |
最大并行文档数 |
max_concurrent_chunks |
10 |
最大并行chunk数 |
6. 最佳实践与性能建议
6.1 选择合适的模式
| 场景 | 推荐模式 | 原因 |
|---|---|---|
| 文档< 100篇,预算有限 | Light | 不调用LLM,成本低 |
| 文档> 1000篇,高精度要求 | General + resolution + community | 完整图谱,支持复杂查询 |
| 实时性要求高 | Light | 构建速度快(< 1分钟) |
| 多语言混合文档 | General | LLM对多语言支持更好 |
6.2 实体类型设计
通用领域:Person, Organization, Location, Event, Category
金融领域:Company, Stock, Indicator, Policy, Market
医疗领域:Disease, Drug, Symptom, Treatment, Organ
建议:
- 类型数量控制在 5-10 个
- 避免过细粒度(如将
Person拆分为CEO,Engineer等)
6.3 Prompt工程
Few-shot示例:在Prompt中提供2-3个示例,提升LLM准确率20-30%
结构化输出:使用固定分隔符(##, <|>),方便解析
语言一致性:Prompt和文档使用相同语言
6.4 成本优化
LLM成本(以GPT-4为例):
- Light模式:$0(不调用LLM)
- General模式:每1000 chunks约$5-10
- 实体消歧:+20%(仅对候选对调用)
- 社区摘要:+10%(每个社区1次调用)
优化建议:
- 使用更便宜的模型(如GPT-3.5-turbo)
- 增大chunk大小(减少调用次数)
- 缓存重复查询(LLM Cache命中率可达30-50%)
7. 监控与可观测性
7.1 关键指标
任务级别:
graphrag_task_duration_seconds:任务总耗时graphrag_task_success_rate:任务成功率graphrag_documents_processed:处理的文档数graphrag_chunks_processed:处理的chunk数
LLM级别:
llm_call_count:LLM调用次数llm_token_usage:Token消耗llm_cache_hit_rate:缓存命中率
图谱级别:
graph_entities_count:实体数量graph_relations_count:关系数量graph_communities_count:社区数量graph_merge_duration_seconds:合并耗时
7.2 日志追踪
进度日志(通过callback实时输出):
[GraphRAG] build_subgraph doc:xxx start (chunks=50, timeout=3000s)
[GraphRAG] Entities extraction of chunk 10/50 done, 25 nodes, 18 edges
[GraphRAG] build_subgraph doc:xxx done
[GraphRAG] kb:yyy merge lock acquired
[GraphRAG] kb:yyy merge finished, graph ready
[GraphRAG] Resolved 150 candidate pairs, 45 of them are selected to merge
[GraphRAG] Community reports done in 120.5s, used tokens: 50000
[GraphRAG] GraphRAG for KB yyy done in 1200.50 seconds
8. 故障排查与常见问题
8.1 任务失败
问题:任务进度显示 -1(失败) 排查:
- 查看任务日志:
/api/v1/kb/trace_graphrag?kb_id=xxx - 检查LLM服务是否正常
- 检查Elasticsearch连接
- 检查Redis分布式锁是否超时
常见原因:
- LLM服务超时或返回错误
- Elasticsearch磁盘满
- Redis锁超时(20分钟)
8.2 图谱质量问题
问题:实体识别不准确 优化:
- 调整
entity_types:针对领域定制 - 使用Few-shot示例:在Prompt中添加示例
- 切换更强的LLM模型(如GPT-4)
问题:实体过多重复 优化:
- 开启
resolution=True - 调整相似度阈值(
is_similarity())
问题:关系抽取遗漏 优化:
- 增加
max_gleanings(默认2) - 减小chunk大小(确保单个chunk不超过1024 tokens)