GraphRAG-05-AI服务层与支撑模块
一、语言模型(Language Model)
1.1 模块概览
Language Model 模块封装了与各种 LLM 提供商的交互,提供统一的调用接口,支持聊天模型和嵌入模型。
1.2 核心组件
ModelManager(模型管理器):
- 单例模式管理模型实例
- 支持模型实例复用
- 自动根据配置创建模型
ChatModel(聊天模型接口):
class ChatModel:
    async def call(
        self,
        messages: list[dict],
        callbacks: list[Callback] | None = None,
        cache: PipelineCache | None = None,
        **kwargs,
    ) -> str:
        """调用聊天模型生成响应"""
    
    async def stream(
        self,
        messages: list[dict],
        **kwargs,
    ) -> AsyncGenerator[str, None]:
        """流式生成响应"""
EmbeddingModel(嵌入模型接口):
class EmbeddingModel:
    async def embed(
        self,
        texts: list[str],
        **kwargs,
    ) -> list[list[float]]:
        """批量生成文本嵌入向量"""
1.3 支持的提供商
| 提供商 | 聊天模型 | 嵌入模型 | 配置示例 | 
|---|---|---|---|
| OpenAI | ✅ | ✅ | type: openai_chat/openai_embedding | 
| Azure OpenAI | ✅ | ✅ | type: azure_openai_chat/azure_openai_embedding | 
| LiteLLM | ✅ | ✅ | type: litellm | 
1.4 配置示例
OpenAI 配置:
models:
  default:
    type: openai_chat
    model: gpt-4o-mini
    api_key: ${OPENAI_API_KEY}
    max_tokens: 4000
    temperature: 0.0
    request_timeout: 180
  
  embedding:
    type: openai_embedding
    model: text-embedding-3-small
    api_key: ${OPENAI_API_KEY}
Azure OpenAI 配置:
models:
  default:
    type: azure_openai_chat
    model: gpt-4
    api_base: ${AZURE_OPENAI_ENDPOINT}
    api_key: ${AZURE_OPENAI_API_KEY}
    api_version: "2024-02-15-preview"
    deployment_name: my-gpt4-deployment
LiteLLM 配置(支持多种提供商):
models:
  default:
    type: litellm
    model: anthropic/claude-3-sonnet-20240229
    api_key: ${ANTHROPIC_API_KEY}
1.5 核心功能
重试与容错
指数退避重试:
# 配置重试策略
retry_config = {
    "max_retries": 10,
    "initial_delay": 1.0,
    "max_delay": 60.0,
    "exponential_base": 2.0,
}
错误处理:
- RateLimitError(429):自动重试
- TimeoutError:重试或降级
- APIError(500+):重试
- AuthenticationError:不重试,直接报错
缓存机制
LLM 调用缓存:
# 缓存键生成
cache_key = gen_sha512_hash({
    "messages": messages,
    "model": model_config.model,
    "temperature": model_config.temperature,
    "max_tokens": model_config.max_tokens,
})
# 检查缓存
cached_response = await cache.get(cache_key)
if cached_response:
    return cached_response
# 调用 LLM
response = await llm.call(messages)
# 存储缓存
await cache.set(cache_key, response)
速率限制
并发控制:
# 使用 asyncio.Semaphore 控制并发
semaphore = asyncio.Semaphore(concurrent_requests)
async with semaphore:
    response = await llm.call(messages)
二、提示词调优(Prompt Tune)
2.1 模块概览
Prompt Tune 模块根据领域数据自动生成定制化的提示词模板,提高实体抽取和社区报告的质量。
2.2 核心流程
flowchart LR
    Input[输入文档] --> Selection[文档选择]
    Selection --> Chunking[文本分块]
    Chunking --> Analysis[领域分析]
    Analysis --> EntityDiscovery[实体类型发现]
    EntityDiscovery --> PromptGen[提示词生成]
    PromptGen --> Output[输出提示词模板]
2.3 文档选择方法
RANDOM(随机选择):
- 随机选择 N 个文档
- 适合数据分布均匀的场景
- 参数:limit=15
TOP(头部选择):
- 选择前 N 个文档
- 适合按时间或重要性排序的数据
- 参数:limit=15
AUTO(自动聚类选择):
- 使用 K-means 聚类
- 从每个聚类中选择代表性文档
- 适合数据分布不均匀的场景
- 参数:n_subset_max=300,k=15
2.4 生成的提示词
实体抽取提示词(entity_extraction.txt):
You are an AI assistant that helps extract entities and relationships from text.
Domain: {domain}
Entity Types:
{entity_types}
Instructions:
1. Identify all entities of the specified types
2. Extract relationships between entities
3. Provide descriptions for each entity and relationship
Examples:
{examples}
Now, extract entities and relationships from the following text:
{text}
Output format: JSON
社区报告提示词(community_report.txt):
Generate a comprehensive report for the given community.
Community Context:
{context}
Include:
1. Summary of the community
2. Key findings
3. Relationships between entities
4. Important themes
Output format: JSON with fields: title, summary, findings, rating
2.5 使用示例
Python API:
from graphrag.api import generate_indexing_prompts
await generate_indexing_prompts(
    config=config,
    domain="financial technology",
    selection_method="auto",
    limit=50,
    language="chinese",
    discover_entity_types=True,
    output=Path("./custom_prompts"),
)
CLI:
graphrag prompt-tune \
  --domain "medical research" \
  --selection-method auto \
  --n-subset-max 500 \
  --k 20 \
  --output ./custom_prompts
三、提示词模板(Prompts)
3.1 默认提示词模板
GraphRAG 提供了预制的提示词模板,位于 graphrag/prompts/ 目录。
3.2 模板类型
索引阶段提示词:
- index/entity_extraction.py:实体抽取提示词
- index/summarize_descriptions.py:描述摘要提示词
- index/community_report.py:社区报告提示词
- index/claim_extraction.py:声明抽取提示词(协变量)
查询阶段提示词:
- query/local_search.py:局部搜索系统提示词
- query/global_search_map.py:全局搜索 Map 提示词
- query/global_search_reduce.py:全局搜索 Reduce 提示词
- query/drift_search_local.py:DRIFT 搜索局部提示词
- query/drift_search_reduce.py:DRIFT 搜索 Reduce 提示词
3.3 自定义提示词
方法 1:使用配置文件:
extract_graph:
  prompt_file: "./prompts/custom_entity_extraction.txt"
community_reports:
  prompt_file: "./prompts/custom_community_report.txt"
local_search:
  prompt_file: "./prompts/custom_local_search.txt"
方法 2:直接修改模板文件:
# graphrag/prompts/index/entity_extraction.py
ENTITY_EXTRACTION_PROMPT = """
Your custom prompt here...
Entity Types: {entity_types}
Text: {text}
"""
四、分词器(Tokenizer)
4.1 模块概览
Tokenizer 模块提供 Token 计数和文本编码功能,用于:
- 文本分块大小控制
- 上下文 Token 预算管理
- LLM API 参数验证
4.2 支持的分词器
TiktokenTokenizer(推荐):
- 基于 OpenAI 的 tiktoken库
- 支持 GPT-3.5/GPT-4 系列模型
- 编码模型:cl100k_base、p50k_base等
LiteLLMTokenizer:
- 基于 LiteLLM 的 token 计数
- 支持多种模型的 token 计数
- 自动根据模型名称选择编码
4.3 核心 API
from graphrag.tokenizer import get_tokenizer
# 获取分词器
tokenizer = get_tokenizer(model_config=model_config)
# 编码文本为 tokens
tokens = tokenizer.encode("Hello, world!")
print(f"Token count: {len(tokens)}")
# 解码 tokens 为文本
text = tokenizer.decode(tokens)
# 直接计数(不返回 tokens)
count = tokenizer.count("Hello, world!")
4.4 配置示例
chunks:
  encoding_model: cl100k_base  # Tiktoken 编码模型
五、向量存储(Vector Stores)
5.1 模块概览
Vector Stores 模块提供向量嵌入的存储和检索功能,支持多种向量数据库。
5.2 支持的向量存储
| 向量存储 | 类型 | 适用场景 | 
|---|---|---|
| LanceDB | 本地文件数据库 | 开发测试、小规模部署 | 
| Azure AI Search | 云服务 | 生产环境、大规模部署 | 
| Azure Cosmos DB | 云服务 | 全球分发、高可用 | 
5.3 BaseVectorStore 接口
class BaseVectorStore(ABC):
    @abstractmethod
    async def load_documents(
        self,
        documents: list[dict],
    ) -> None:
        """批量加载文档和嵌入向量"""
    
    @abstractmethod
    async def similarity_search(
        self,
        query_embedding: list[float],
        k: int = 10,
        **kwargs,
    ) -> list[dict]:
        """向量相似度搜索"""
    
    @abstractmethod
    async def get_document(
        self,
        document_id: str,
    ) -> dict | None:
        """根据 ID 获取文档"""
5.4 配置示例
LanceDB:
vector_store:
  entity_description_embedding:
    type: lancedb
    db_uri: "./lancedb"
    container_name: entity-description
    overwrite: false
Azure AI Search:
vector_store:
  entity_description_embedding:
    type: azure_ai_search
    api_key: ${AZURE_SEARCH_API_KEY}
    endpoint: ${AZURE_SEARCH_ENDPOINT}
    index_name: graphrag-entities
Azure Cosmos DB:
vector_store:
  entity_description_embedding:
    type: cosmosdb
    connection_string: ${COSMOS_DB_CONNECTION_STRING}
    database_name: graphrag
    container_name: entity-embeddings
5.5 使用示例
写入向量:
# 在嵌入生成后自动写入
await embed_text(
    input=entities,
    embed_column="description",
    strategy={
        "vector_store": {
            "type": "lancedb",
            "db_uri": "./lancedb",
            "container_name": "entities",
        }
    },
)
向量检索:
from graphrag.vector_stores.factory import create_vector_store
# 创建向量存储实例
vector_store = create_vector_store(
    vector_store_type="lancedb",
    db_uri="./lancedb",
    container_name="entities",
)
# 生成查询嵌入
query_embedding = await embedding_model.embed(["What is AI?"])
# 相似度搜索
results = await vector_store.similarity_search(
    query_embedding=query_embedding[0],
    k=10,
)
# 结果包含:id, title, score, embedding
for result in results:
    print(f"ID: {result['id']}, Score: {result['score']}")
六、工厂模式(Factory)
6.1 模块概览
Factory 模块提供统一的对象创建接口,封装复杂的初始化逻辑。
6.2 核心工厂
PipelineFactory(管道工厂):
# 注册工作流
PipelineFactory.register("my_workflow", my_workflow_function)
# 注册管道
PipelineFactory.register_pipeline("my_method", [
    "workflow1",
    "workflow2",
    "workflow3",
])
# 创建管道
pipeline = PipelineFactory.create_pipeline(config, method="my_method")
StorageFactory(存储工厂):
from graphrag.storage.factory import create_storage_from_config
# 根据配置创建存储
storage = create_storage_from_config(config.storage)
CacheFactory(缓存工厂):
from graphrag.cache.factory import create_cache_from_config
# 根据配置创建缓存
cache = create_cache_from_config(config.cache)
VectorStoreFactory(向量存储工厂):
from graphrag.vector_stores.factory import create_vector_store
# 根据类型创建向量存储
vector_store = create_vector_store(
    vector_store_type="lancedb",
    **vector_store_config,
)
七、日志系统(Logger)
7.1 模块概览
Logger 模块提供统一的日志记录功能,支持多种输出目标。
7.2 日志输出目标
标准输出(Console):
- 默认日志目标
- 适合开发和调试
文件(File):
- 日志写入本地文件
- 支持日志轮转
- 路径:{reporting.base_dir}/logs/indexing.log
Blob Storage:
- 日志写入 Azure Blob Storage
- 适合云端部署
7.3 日志级别
| 级别 | 说明 | 使用场景 | 
|---|---|---|
| DEBUG | 调试信息 | 开发调试 | 
| INFO | 一般信息 | 正常运行 | 
| WARNING | 警告信息 | 潜在问题 | 
| ERROR | 错误信息 | 执行失败 | 
7.4 配置示例
reporting:
  type: file
  base_dir: "./logs"
7.5 使用示例
初始化日志:
from graphrag.logger.standard_logging import init_loggers
# 初始化日志系统
init_loggers(
    config=config,
    verbose=True,  # 详细日志
    filename="indexing.log",
)
记录日志:
import logging
logger = logging.getLogger(__name__)
logger.info("索引构建开始")
logger.debug("加载配置: %s", config)
logger.warning("缓存未命中: %s", key)
logger.error("LLM 调用失败: %s", error)
7.6 BlobWorkflowLogger
功能:将工作流日志写入 Blob Storage
使用场景:
- 云端部署
- 需要集中管理日志
- 长期日志存储
配置:
reporting:
  type: blob
  connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
  container_name: graphrag-logs
  base_dir: "project1/logs"
八、模块交互示例
8.1 完整索引构建流程
import asyncio
from graphrag.config import load_config
from graphrag.language_model.manager import ModelManager
from graphrag.cache.factory import create_cache_from_config
from graphrag.storage.factory import create_storage_from_config
from graphrag.logger.standard_logging import init_loggers
from graphrag.api import build_index
async def main():
    # 1. 加载配置
    config = load_config(root_dir="./my_project")
    
    # 2. 初始化日志
    init_loggers(config=config, verbose=True)
    
    # 3. 创建存储和缓存(自动由 API 处理)
    # storage = create_storage_from_config(config.storage)
    # cache = create_cache_from_config(config.cache)
    
    # 4. 构建索引(内部使用 ModelManager 创建 LLM)
    results = await build_index(config=config, method="standard")
    
    print("索引构建完成!")
asyncio.run(main())
8.2 自定义 LLM 调用
from graphrag.language_model.manager import ModelManager
from graphrag.cache.json_pipeline_cache import JsonPipelineCache
# 创建模型管理器
model_manager = ModelManager()
# 创建聊天模型
chat_model = model_manager.get_or_create_chat_model(
    name="my_model",
    model_type="openai_chat",
    config={
        "model": "gpt-4o-mini",
        "api_key": "sk-...",
        "temperature": 0.0,
    },
)
# 创建缓存
cache = JsonPipelineCache(base_dir="./cache")
# 调用模型
response = await chat_model.call(
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "What is AI?"},
    ],
    cache=cache,
)
print(response)
九、最佳实践
9.1 模型选择
索引构建:
- 实体抽取:gpt-4o-mini(平衡质量和成本)
- 社区报告:gpt-4o-mini或gpt-4-turbo
- 嵌入:text-embedding-3-small
查询执行:
- 答案生成:gpt-4-turbo或gpt-4o(高质量)
9.2 缓存策略
开发阶段:
- 使用 memory缓存(快速测试)
- 或使用 json缓存(持久化,便于调试)
生产阶段:
- 使用 file缓存(Parquet 格式,高效)
- 定期清理过期缓存
9.3 向量存储选择
小规模(< 10K 实体):
- 使用 LanceDB(本地文件,简单快速)
大规模(> 10K 实体):
- 使用 Azure AI Search(云服务,高性能)
- 或 Azure Cosmos DB(全球分发)
9.4 日志管理
开发阶段:
- 启用 verbose=True(详细日志)
- 使用控制台输出
生产阶段:
- 写入文件或 Blob Storage
- 配置日志轮转
- 监控 ERROR 级别日志
十、总结
本文档详细介绍了 GraphRAG 的 AI 服务层和支撑模块:
- Language Model:统一的 LLM 调用接口,支持多种提供商
- Prompt Tune:自动生成定制化提示词
- Prompts:预制和自定义提示词模板
- Tokenizer:Token 计数和文本编码
- Vector Stores:向量嵌入存储和检索
- Factory:统一的对象创建接口
- Logger:灵活的日志记录系统
通过合理配置和使用这些模块,可以实现高效、可靠的知识图谱构建和查询。