GraphRAG-16-实战指南与最佳实践

1. 完整使用示例

示例 1:从零开始构建知识图谱

import asyncio
import pandas as pd
from pathlib import Path
from graphrag.config import load_config, create_graphrag_config
from graphrag.api import build_index

async def build_knowledge_graph():
    # 步骤 1: 初始化项目(命令行执行)
    # graphrag init --root ./my_graphrag_project
    
    # 步骤 2: 准备输入数据
    project_root = Path("./my_graphrag_project")
    input_dir = project_root / "input"
    input_dir.mkdir(exist_ok=True)
    
    # 将文档复制到 input/ 目录
    # 支持的格式:txt, csv, json
    
    # 步骤 3: 加载或创建配置
    config = create_graphrag_config(
        values={
            "root_dir": str(project_root),
            "chunks": {"size": 400, "overlap": 100},
            "extract_graph": {
                "entity_types": ["organization", "person", "location", "event"],
                "concurrent_requests": 25,
            },
            "models": {
                "default": {
                    "type": "openai_chat",
                    "model": "gpt-4o-mini",
                    "api_key": "${OPENAI_API_KEY}",
                }
            },
        }
    )
    
    # 步骤 4: 构建索引
    print("开始构建知识图谱索引...")
    results = await build_index(
        config=config,
        method="standard",  # 或 "fast"(跳过 LLM,使用 NLP)
        verbose=True,
    )
    
    # 步骤 5: 检查结果
    for result in results:
        print(f"工作流: {result.workflow}")
        print(f"运行时间: {result.total_runtime:.2f}s")
        if result.errors:
            print(f"错误: {result.errors}")
    
    print("\n索引构建完成!输出目录:", project_root / "output")
    print("生成的文件:")
    print("  - entities.parquet: 实体表")
    print("  - relationships.parquet: 关系表")
    print("  - communities.parquet: 社区表")
    print("  - community_reports.parquet: 社区报告")
    print("  - text_units.parquet: 文本块")

asyncio.run(build_knowledge_graph())

示例 2:执行多种查询

import asyncio
import pandas as pd
from graphrag.config import load_config
from graphrag.api import local_search, global_search, drift_search

async def run_queries():
    # 1. 加载配置和索引数据
    config = load_config(root_dir="./my_graphrag_project")
    output_dir = "./my_graphrag_project/output"
    
    entities = pd.read_parquet(f"{output_dir}/entities.parquet")
    relationships = pd.read_parquet(f"{output_dir}/relationships.parquet")
    text_units = pd.read_parquet(f"{output_dir}/text_units.parquet")
    communities = pd.read_parquet(f"{output_dir}/communities.parquet")
    community_reports = pd.read_parquet(f"{output_dir}/community_reports.parquet")
    
    # 2. 局部搜索示例(适合特定实体问题)
    print("\n=== 局部搜索 ===")
    query1 = "What is the relationship between Microsoft and OpenAI?"
    response1, context1 = await local_search(
        config=config,
        entities=entities,
        communities=communities,
        community_reports=community_reports,
        text_units=text_units,
        relationships=relationships,
        covariates=None,
        community_level=2,
        response_type="Multiple Paragraphs",
        query=query1,
    )
    print(f"问题: {query1}")
    print(f"答案: {response1}")
    print(f"使用的实体数: {len(context1.get('entities', []))}")
    
    # 3. 全局搜索示例(适合宏观主题问题)
    print("\n=== 全局搜索 ===")
    query2 = "What are the main trends in AI research?"
    response2, context2 = await global_search(
        config=config,
        entities=entities,
        communities=communities,
        community_reports=community_reports,
        community_level=2,
        dynamic_community_selection=False,
        response_type="Multiple Paragraphs",
        query=query2,
    )
    print(f"问题: {query2}")
    print(f"答案: {response2}")
    
    # 4. DRIFT 搜索示例(混合模式)
    print("\n=== DRIFT 搜索 ===")
    query3 = "How has Microsoft's AI strategy evolved?"
    response3, context3 = await drift_search(
        config=config,
        entities=entities,
        communities=communities,
        community_reports=community_reports,
        text_units=text_units,
        relationships=relationships,
        community_level=2,
        response_type="Multiple Paragraphs",
        query=query3,
    )
    print(f"问题: {query3}")
    print(f"答案: {response3}")

asyncio.run(run_queries())

示例 3:流式查询(降低延迟)

import asyncio
from graphrag.api import local_search_streaming

async def streaming_query():
    # 配置和数据加载(同上)
    ...
    
    print("正在生成答案(流式输出): ", end="", flush=True)
    
    async for chunk in local_search_streaming(
        config=config,
        entities=entities,
        communities=communities,
        community_reports=community_reports,
        text_units=text_units,
        relationships=relationships,
        covariates=None,
        community_level=2,
        response_type="Multiple Paragraphs",
        query="Explain the impact of AI on healthcare.",
    ):
        print(chunk, end="", flush=True)
    
    print("\n\n答案生成完成!")

asyncio.run(streaming_query())

2. 实战经验

2.1 文档准备

格式选择

  • 纯文本(.txt):适合长文档、书籍、报告
  • CSV:适合结构化数据(包含元数据)
    • 必需列:id, text
    • 可选列:title, metadata(JSON字符串)
  • JSON:适合复杂结构数据

文档预处理

import pandas as pd

# CSV 格式准备
documents = pd.DataFrame({
    "id": ["doc1", "doc2"],
    "title": ["AI Overview", "LLM Applications"],
    "text": ["...", "..."],
    "metadata": ['{"source": "paper", "year": 2024}', '{"source": "blog"}'],
})
documents.to_csv("./my_graphrag_project/input/documents.csv", index=False)

文档规模建议

  • 小规模:< 1000 文档(适合快速实验)
  • 中规模:1000-10000 文档(适合企业知识库)
  • 大规模:> 10000 文档(需要性能优化)

2.2 配置调优策略

快速原型(开发阶段):

chunks:
  size: 300
  overlap: 50

extract_graph:
  concurrent_requests: 10  # 降低并发
  entity_types: ["organization", "person"]  # 减少实体类型

cache:
  type: memory  # 快速缓存

models:
  default:
    model: gpt-4o-mini  # 使用快速模型

生产环境

chunks:
  size: 600
  overlap: 150

extract_graph:
  concurrent_requests: 50  # 提高并发
  entity_types: ["organization", "person", "location", "event", "technology"]
  max_gleanings: 2  # 迭代抽取

cache:
  type: file
  base_dir: "./cache"

models:
  default:
    model: gpt-4-turbo  # 高质量模型
  
  community_reports:  # 专用模型
    model: gpt-4o

2.3 成本优化

降低成本策略

  1. 启用缓存:减少 60-80% 的 LLM 调用
cache:
  type: file
  base_dir: "./cache"
  1. 使用 Fast 模式:跳过 LLM 抽取
await build_index(config=config, method="fast")
  1. 模型选择

    • 实体抽取:gpt-4o-mini(平衡质量和成本)
    • 社区报告:gpt-4o-minigpt-3.5-turbo
    • 查询生成:gpt-4-turbo(高质量答案)
  2. 限制实体类型

extract_graph:
  entity_types: ["organization", "person"]  # 仅抽取关键类型

成本估算

  • 1000 文档(每文档 500 tokens):
    • Standard 模式:约 $10-30(取决于模型)
    • Fast 模式:约 $2-5

2.4 性能优化

索引构建加速

  1. 增加并发度
extract_graph:
  concurrent_requests: 50  # Azure OpenAI: 根据 TPM 限制调整
  1. 使用 SSD 存储
storage:
  type: file
  base_dir: "/mnt/fast-ssd/graphrag-output"
  1. 启用嵌入缓存
# 预先计算嵌入,避免重复计算
  1. 分批处理大规模文档
# 将 10000 文档分为 10 批,每批 1000
for batch in batches:
    await build_index(config=config, input_documents=batch)

查询加速

  1. 使用向量数据库
vector_store:
  entity_description_embedding:
    type: lancedb  # 或 azure_ai_search
    container_name: graphrag-embeddings
  1. 流式查询
# 降低首 token 延迟
async for chunk in local_search_streaming(...):
    print(chunk, end="")
  1. 调整 Token 限制
local_search:
  max_context_tokens: 3000  # 减少上下文,加快响应

3. 最佳实践

3.1 领域定制

金融领域示例

extract_graph:
  entity_types:
    - company
    - person
    - financial_instrument
    - regulation
    - event
  
  prompt: |
    Extract entities and relationships from financial documents.
    Focus on:
    - Corporate actions (M&A, IPO, etc.)
    - Regulatory compliance
    - Financial performance metrics

医疗领域示例

extract_graph:
  entity_types:
    - disease
    - drug
    - person
    - organization
    - clinical_trial

3.2 提示词调优

使用自动提示词生成

from graphrag.api import generate_indexing_prompts

# 根据领域数据自动生成定制化提示词
await generate_indexing_prompts(
    config=config,
    domain="medical research",  # 指定领域
    selection_method="auto",
    n_subset_max=100,
    output="./prompts",
)

手动调整提示词

extract_graph:
  prompt_file: "./prompts/custom_entity_extraction.txt"

community_reports:
  prompt_file: "./prompts/custom_community_report.txt"

3.3 增量更新策略

何时使用增量更新

  • 新增文档 < 10% 现有文档
  • 不涉及配置变更

增量更新执行

# 将新文档放入 input/ 目录
await build_index(
    config=config,
    method="standard",
    is_update_run=True,  # 关键参数
)

定期全量重建

  • 每月或每季度执行一次全量重建
  • 清理冗余实体和关系

3.4 质量监控

关键指标

# 索引质量
- 实体数量await len(entities)
- 关系数量await len(relationships)
- 社区数量await len(communities)
- 平均社区大小await communities["size"].mean()

# 查询质量
- 响应时间measure_latency()
- 上下文相关性manual_evaluation()
- 答案准确性compare_with_ground_truth()

日志分析

# 分析工作流日志
logs = pd.read_csv("./output/logs/indexing.log")
print(logs[logs["level"] == "ERROR"])

4. 常见问题与解决方案

Q1: 实体抽取质量不佳

问题:抽取的实体不准确或缺失关键实体

解决方案

  1. 使用提示词调优工具
  2. 增加 max_gleanings 参数(迭代抽取)
  3. 调整 entity_types
  4. 使用更强大的模型(如 gpt-4-turbo)

Q2: 社区划分不合理

问题:社区过大或过小

解决方案

  1. 调整 max_cluster_size(10-50)
  2. 启用 use_lcc=False(包含孤立节点)
  3. 检查关系抽取质量

Q3: 查询响应时间长

问题:查询超过 30 秒

解决方案

  1. 使用流式查询
  2. 减少 max_context_tokens
  3. 启用向量数据库
  4. 降低 top_k_entities 参数

Q4: 内存不足

问题:索引构建时内存溢出

解决方案

  1. 减少 concurrent_requests
  2. 增加 chunks.size(减少文本块数量)
  3. 分批处理文档

5. 高级用例

用例 1:多语言支持

chunks:
  encoding_model: cl100k_base  # 支持多语言

models:
  default:
    model: gpt-4o  # 多语言支持更好
    language: "chinese"  # 指定主要语言

extract_graph:
  prompt: |
    请从中文文本中提取实体和关系。

用例 2:自定义工作流

from graphrag.index.workflows.factory import PipelineFactory

# 注册自定义工作流
@PipelineFactory.register("custom_processing")
async def custom_workflow(config, context):
    # 自定义处理逻辑
    data = await load_table_from_storage("entities", context.output_storage)
    # ... 处理 ...
    await write_table_to_storage(processed_data, "entities", context.output_storage)
    return WorkflowFunctionOutput(result=processed_data)

# 使用自定义工作流
config.workflows = [
    "load_input_documents",
    "create_base_text_units",
    "extract_graph",
    "custom_processing",  # 插入自定义工作流
    "create_communities",
]

用例 3:集成到现有应用

from fastapi import FastAPI
from graphrag.api import local_search

app = FastAPI()

# 全局加载索引(启动时)
@app.on_event("startup")
async def load_index():
    global entities, relationships, ...
    entities = pd.read_parquet("./output/entities.parquet")
    # ...

# 查询端点
@app.post("/query")
async def query_endpoint(query: str, mode: str = "local"):
    response, context = await local_search(
        config=config,
        entities=entities,
        # ...
        query=query,
    )
    return {"answer": response, "context": context}

6. 总结

GraphRAG 提供了强大的知识图谱构建和查询能力。通过合理的配置、优化和领域定制,可以在各种场景下实现高质量的检索增强生成。

关键要点

  • 根据场景选择 Standard/Fast 模式
  • 启用缓存以降低成本
  • 调整并发度以平衡速度和资源
  • 使用提示词调优提高质量
  • 定期监控和优化