GraphRAG-16-实战指南与最佳实践
1. 完整使用示例
示例 1:从零开始构建知识图谱
import asyncio
import pandas as pd
from pathlib import Path
from graphrag.config import load_config, create_graphrag_config
from graphrag.api import build_index
async def build_knowledge_graph():
# 步骤 1: 初始化项目(命令行执行)
# graphrag init --root ./my_graphrag_project
# 步骤 2: 准备输入数据
project_root = Path("./my_graphrag_project")
input_dir = project_root / "input"
input_dir.mkdir(exist_ok=True)
# 将文档复制到 input/ 目录
# 支持的格式:txt, csv, json
# 步骤 3: 加载或创建配置
config = create_graphrag_config(
values={
"root_dir": str(project_root),
"chunks": {"size": 400, "overlap": 100},
"extract_graph": {
"entity_types": ["organization", "person", "location", "event"],
"concurrent_requests": 25,
},
"models": {
"default": {
"type": "openai_chat",
"model": "gpt-4o-mini",
"api_key": "${OPENAI_API_KEY}",
}
},
}
)
# 步骤 4: 构建索引
print("开始构建知识图谱索引...")
results = await build_index(
config=config,
method="standard", # 或 "fast"(跳过 LLM,使用 NLP)
verbose=True,
)
# 步骤 5: 检查结果
for result in results:
print(f"工作流: {result.workflow}")
print(f"运行时间: {result.total_runtime:.2f}s")
if result.errors:
print(f"错误: {result.errors}")
print("\n索引构建完成!输出目录:", project_root / "output")
print("生成的文件:")
print(" - entities.parquet: 实体表")
print(" - relationships.parquet: 关系表")
print(" - communities.parquet: 社区表")
print(" - community_reports.parquet: 社区报告")
print(" - text_units.parquet: 文本块")
asyncio.run(build_knowledge_graph())
示例 2:执行多种查询
import asyncio
import pandas as pd
from graphrag.config import load_config
from graphrag.api import local_search, global_search, drift_search
async def run_queries():
# 1. 加载配置和索引数据
config = load_config(root_dir="./my_graphrag_project")
output_dir = "./my_graphrag_project/output"
entities = pd.read_parquet(f"{output_dir}/entities.parquet")
relationships = pd.read_parquet(f"{output_dir}/relationships.parquet")
text_units = pd.read_parquet(f"{output_dir}/text_units.parquet")
communities = pd.read_parquet(f"{output_dir}/communities.parquet")
community_reports = pd.read_parquet(f"{output_dir}/community_reports.parquet")
# 2. 局部搜索示例(适合特定实体问题)
print("\n=== 局部搜索 ===")
query1 = "What is the relationship between Microsoft and OpenAI?"
response1, context1 = await local_search(
config=config,
entities=entities,
communities=communities,
community_reports=community_reports,
text_units=text_units,
relationships=relationships,
covariates=None,
community_level=2,
response_type="Multiple Paragraphs",
query=query1,
)
print(f"问题: {query1}")
print(f"答案: {response1}")
print(f"使用的实体数: {len(context1.get('entities', []))}")
# 3. 全局搜索示例(适合宏观主题问题)
print("\n=== 全局搜索 ===")
query2 = "What are the main trends in AI research?"
response2, context2 = await global_search(
config=config,
entities=entities,
communities=communities,
community_reports=community_reports,
community_level=2,
dynamic_community_selection=False,
response_type="Multiple Paragraphs",
query=query2,
)
print(f"问题: {query2}")
print(f"答案: {response2}")
# 4. DRIFT 搜索示例(混合模式)
print("\n=== DRIFT 搜索 ===")
query3 = "How has Microsoft's AI strategy evolved?"
response3, context3 = await drift_search(
config=config,
entities=entities,
communities=communities,
community_reports=community_reports,
text_units=text_units,
relationships=relationships,
community_level=2,
response_type="Multiple Paragraphs",
query=query3,
)
print(f"问题: {query3}")
print(f"答案: {response3}")
asyncio.run(run_queries())
示例 3:流式查询(降低延迟)
import asyncio
from graphrag.api import local_search_streaming
async def streaming_query():
# 配置和数据加载(同上)
...
print("正在生成答案(流式输出): ", end="", flush=True)
async for chunk in local_search_streaming(
config=config,
entities=entities,
communities=communities,
community_reports=community_reports,
text_units=text_units,
relationships=relationships,
covariates=None,
community_level=2,
response_type="Multiple Paragraphs",
query="Explain the impact of AI on healthcare.",
):
print(chunk, end="", flush=True)
print("\n\n答案生成完成!")
asyncio.run(streaming_query())
2. 实战经验
2.1 文档准备
格式选择:
- 纯文本(.txt):适合长文档、书籍、报告
- CSV:适合结构化数据(包含元数据)
- 必需列:
id,text - 可选列:
title,metadata(JSON字符串)
- 必需列:
- JSON:适合复杂结构数据
文档预处理:
import pandas as pd
# CSV 格式准备
documents = pd.DataFrame({
"id": ["doc1", "doc2"],
"title": ["AI Overview", "LLM Applications"],
"text": ["...", "..."],
"metadata": ['{"source": "paper", "year": 2024}', '{"source": "blog"}'],
})
documents.to_csv("./my_graphrag_project/input/documents.csv", index=False)
文档规模建议:
- 小规模:< 1000 文档(适合快速实验)
- 中规模:1000-10000 文档(适合企业知识库)
- 大规模:> 10000 文档(需要性能优化)
2.2 配置调优策略
快速原型(开发阶段):
chunks:
size: 300
overlap: 50
extract_graph:
concurrent_requests: 10 # 降低并发
entity_types: ["organization", "person"] # 减少实体类型
cache:
type: memory # 快速缓存
models:
default:
model: gpt-4o-mini # 使用快速模型
生产环境:
chunks:
size: 600
overlap: 150
extract_graph:
concurrent_requests: 50 # 提高并发
entity_types: ["organization", "person", "location", "event", "technology"]
max_gleanings: 2 # 迭代抽取
cache:
type: file
base_dir: "./cache"
models:
default:
model: gpt-4-turbo # 高质量模型
community_reports: # 专用模型
model: gpt-4o
2.3 成本优化
降低成本策略:
- 启用缓存:减少 60-80% 的 LLM 调用
cache:
type: file
base_dir: "./cache"
- 使用 Fast 模式:跳过 LLM 抽取
await build_index(config=config, method="fast")
-
模型选择:
- 实体抽取:
gpt-4o-mini(平衡质量和成本) - 社区报告:
gpt-4o-mini或gpt-3.5-turbo - 查询生成:
gpt-4-turbo(高质量答案)
- 实体抽取:
-
限制实体类型:
extract_graph:
entity_types: ["organization", "person"] # 仅抽取关键类型
成本估算:
- 1000 文档(每文档 500 tokens):
- Standard 模式:约 $10-30(取决于模型)
- Fast 模式:约 $2-5
2.4 性能优化
索引构建加速:
- 增加并发度:
extract_graph:
concurrent_requests: 50 # Azure OpenAI: 根据 TPM 限制调整
- 使用 SSD 存储:
storage:
type: file
base_dir: "/mnt/fast-ssd/graphrag-output"
- 启用嵌入缓存:
# 预先计算嵌入,避免重复计算
- 分批处理大规模文档:
# 将 10000 文档分为 10 批,每批 1000
for batch in batches:
await build_index(config=config, input_documents=batch)
查询加速:
- 使用向量数据库:
vector_store:
entity_description_embedding:
type: lancedb # 或 azure_ai_search
container_name: graphrag-embeddings
- 流式查询:
# 降低首 token 延迟
async for chunk in local_search_streaming(...):
print(chunk, end="")
- 调整 Token 限制:
local_search:
max_context_tokens: 3000 # 减少上下文,加快响应
3. 最佳实践
3.1 领域定制
金融领域示例:
extract_graph:
entity_types:
- company
- person
- financial_instrument
- regulation
- event
prompt: |
Extract entities and relationships from financial documents.
Focus on:
- Corporate actions (M&A, IPO, etc.)
- Regulatory compliance
- Financial performance metrics
医疗领域示例:
extract_graph:
entity_types:
- disease
- drug
- person
- organization
- clinical_trial
3.2 提示词调优
使用自动提示词生成:
from graphrag.api import generate_indexing_prompts
# 根据领域数据自动生成定制化提示词
await generate_indexing_prompts(
config=config,
domain="medical research", # 指定领域
selection_method="auto",
n_subset_max=100,
output="./prompts",
)
手动调整提示词:
extract_graph:
prompt_file: "./prompts/custom_entity_extraction.txt"
community_reports:
prompt_file: "./prompts/custom_community_report.txt"
3.3 增量更新策略
何时使用增量更新:
- 新增文档 < 10% 现有文档
- 不涉及配置变更
增量更新执行:
# 将新文档放入 input/ 目录
await build_index(
config=config,
method="standard",
is_update_run=True, # 关键参数
)
定期全量重建:
- 每月或每季度执行一次全量重建
- 清理冗余实体和关系
3.4 质量监控
关键指标:
# 索引质量
- 实体数量:await len(entities)
- 关系数量:await len(relationships)
- 社区数量:await len(communities)
- 平均社区大小:await communities["size"].mean()
# 查询质量
- 响应时间:measure_latency()
- 上下文相关性:manual_evaluation()
- 答案准确性:compare_with_ground_truth()
日志分析:
# 分析工作流日志
logs = pd.read_csv("./output/logs/indexing.log")
print(logs[logs["level"] == "ERROR"])
4. 常见问题与解决方案
Q1: 实体抽取质量不佳
问题:抽取的实体不准确或缺失关键实体
解决方案:
- 使用提示词调优工具
- 增加
max_gleanings参数(迭代抽取) - 调整
entity_types - 使用更强大的模型(如 gpt-4-turbo)
Q2: 社区划分不合理
问题:社区过大或过小
解决方案:
- 调整
max_cluster_size(10-50) - 启用
use_lcc=False(包含孤立节点) - 检查关系抽取质量
Q3: 查询响应时间长
问题:查询超过 30 秒
解决方案:
- 使用流式查询
- 减少
max_context_tokens - 启用向量数据库
- 降低
top_k_entities参数
Q4: 内存不足
问题:索引构建时内存溢出
解决方案:
- 减少
concurrent_requests - 增加
chunks.size(减少文本块数量) - 分批处理文档
5. 高级用例
用例 1:多语言支持
chunks:
encoding_model: cl100k_base # 支持多语言
models:
default:
model: gpt-4o # 多语言支持更好
language: "chinese" # 指定主要语言
extract_graph:
prompt: |
请从中文文本中提取实体和关系。
用例 2:自定义工作流
from graphrag.index.workflows.factory import PipelineFactory
# 注册自定义工作流
@PipelineFactory.register("custom_processing")
async def custom_workflow(config, context):
# 自定义处理逻辑
data = await load_table_from_storage("entities", context.output_storage)
# ... 处理 ...
await write_table_to_storage(processed_data, "entities", context.output_storage)
return WorkflowFunctionOutput(result=processed_data)
# 使用自定义工作流
config.workflows = [
"load_input_documents",
"create_base_text_units",
"extract_graph",
"custom_processing", # 插入自定义工作流
"create_communities",
]
用例 3:集成到现有应用
from fastapi import FastAPI
from graphrag.api import local_search
app = FastAPI()
# 全局加载索引(启动时)
@app.on_event("startup")
async def load_index():
global entities, relationships, ...
entities = pd.read_parquet("./output/entities.parquet")
# ...
# 查询端点
@app.post("/query")
async def query_endpoint(query: str, mode: str = "local"):
response, context = await local_search(
config=config,
entities=entities,
# ...
query=query,
)
return {"answer": response, "context": context}
6. 总结
GraphRAG 提供了强大的知识图谱构建和查询能力。通过合理的配置、优化和领域定制,可以在各种场景下实现高质量的检索增强生成。
关键要点:
- 根据场景选择 Standard/Fast 模式
- 启用缓存以降低成本
- 调整并发度以平衡速度和资源
- 使用提示词调优提高质量
- 定期监控和优化