GraphRAG-05-AI服务层与支撑模块

一、语言模型(Language Model)

1.1 模块概览

Language Model 模块封装了与各种 LLM 提供商的交互,提供统一的调用接口,支持聊天模型和嵌入模型。

1.2 核心组件

ModelManager(模型管理器)

  • 单例模式管理模型实例
  • 支持模型实例复用
  • 自动根据配置创建模型

ChatModel(聊天模型接口)

class ChatModel:
    async def call(
        self,
        messages: list[dict],
        callbacks: list[Callback] | None = None,
        cache: PipelineCache | None = None,
        **kwargs,
    ) -> str:
        """调用聊天模型生成响应"""
    
    async def stream(
        self,
        messages: list[dict],
        **kwargs,
    ) -> AsyncGenerator[str, None]:
        """流式生成响应"""

EmbeddingModel(嵌入模型接口)

class EmbeddingModel:
    async def embed(
        self,
        texts: list[str],
        **kwargs,
    ) -> list[list[float]]:
        """批量生成文本嵌入向量"""

1.3 支持的提供商

提供商 聊天模型 嵌入模型 配置示例
OpenAI type: openai_chat / openai_embedding
Azure OpenAI type: azure_openai_chat / azure_openai_embedding
LiteLLM type: litellm

1.4 配置示例

OpenAI 配置

models:
  default:
    type: openai_chat
    model: gpt-4o-mini
    api_key: ${OPENAI_API_KEY}
    max_tokens: 4000
    temperature: 0.0
    request_timeout: 180
  
  embedding:
    type: openai_embedding
    model: text-embedding-3-small
    api_key: ${OPENAI_API_KEY}

Azure OpenAI 配置

models:
  default:
    type: azure_openai_chat
    model: gpt-4
    api_base: ${AZURE_OPENAI_ENDPOINT}
    api_key: ${AZURE_OPENAI_API_KEY}
    api_version: "2024-02-15-preview"
    deployment_name: my-gpt4-deployment

LiteLLM 配置(支持多种提供商):

models:
  default:
    type: litellm
    model: anthropic/claude-3-sonnet-20240229
    api_key: ${ANTHROPIC_API_KEY}

1.5 核心功能

重试与容错

指数退避重试

# 配置重试策略
retry_config = {
    "max_retries": 10,
    "initial_delay": 1.0,
    "max_delay": 60.0,
    "exponential_base": 2.0,
}

错误处理

  • RateLimitError(429):自动重试
  • TimeoutError:重试或降级
  • APIError(500+):重试
  • AuthenticationError:不重试,直接报错

缓存机制

LLM 调用缓存

# 缓存键生成
cache_key = gen_sha512_hash({
    "messages": messages,
    "model": model_config.model,
    "temperature": model_config.temperature,
    "max_tokens": model_config.max_tokens,
})

# 检查缓存
cached_response = await cache.get(cache_key)
if cached_response:
    return cached_response

# 调用 LLM
response = await llm.call(messages)

# 存储缓存
await cache.set(cache_key, response)

速率限制

并发控制

# 使用 asyncio.Semaphore 控制并发
semaphore = asyncio.Semaphore(concurrent_requests)

async with semaphore:
    response = await llm.call(messages)

二、提示词调优(Prompt Tune)

2.1 模块概览

Prompt Tune 模块根据领域数据自动生成定制化的提示词模板,提高实体抽取和社区报告的质量。

2.2 核心流程

flowchart LR
    Input[输入文档] --> Selection[文档选择]
    Selection --> Chunking[文本分块]
    Chunking --> Analysis[领域分析]
    Analysis --> EntityDiscovery[实体类型发现]
    EntityDiscovery --> PromptGen[提示词生成]
    PromptGen --> Output[输出提示词模板]

2.3 文档选择方法

RANDOM(随机选择)

  • 随机选择 N 个文档
  • 适合数据分布均匀的场景
  • 参数:limit=15

TOP(头部选择)

  • 选择前 N 个文档
  • 适合按时间或重要性排序的数据
  • 参数:limit=15

AUTO(自动聚类选择)

  • 使用 K-means 聚类
  • 从每个聚类中选择代表性文档
  • 适合数据分布不均匀的场景
  • 参数:n_subset_max=300, k=15

2.4 生成的提示词

实体抽取提示词entity_extraction.txt):

You are an AI assistant that helps extract entities and relationships from text.

Domain: {domain}

Entity Types:
{entity_types}

Instructions:
1. Identify all entities of the specified types
2. Extract relationships between entities
3. Provide descriptions for each entity and relationship

Examples:
{examples}

Now, extract entities and relationships from the following text:
{text}

Output format: JSON

社区报告提示词community_report.txt):

Generate a comprehensive report for the given community.

Community Context:
{context}

Include:
1. Summary of the community
2. Key findings
3. Relationships between entities
4. Important themes

Output format: JSON with fields: title, summary, findings, rating

2.5 使用示例

Python API

from graphrag.api import generate_indexing_prompts

await generate_indexing_prompts(
    config=config,
    domain="financial technology",
    selection_method="auto",
    limit=50,
    language="chinese",
    discover_entity_types=True,
    output=Path("./custom_prompts"),
)

CLI

graphrag prompt-tune \
  --domain "medical research" \
  --selection-method auto \
  --n-subset-max 500 \
  --k 20 \
  --output ./custom_prompts

三、提示词模板(Prompts)

3.1 默认提示词模板

GraphRAG 提供了预制的提示词模板,位于 graphrag/prompts/ 目录。

3.2 模板类型

索引阶段提示词

  • index/entity_extraction.py:实体抽取提示词
  • index/summarize_descriptions.py:描述摘要提示词
  • index/community_report.py:社区报告提示词
  • index/claim_extraction.py:声明抽取提示词(协变量)

查询阶段提示词

  • query/local_search.py:局部搜索系统提示词
  • query/global_search_map.py:全局搜索 Map 提示词
  • query/global_search_reduce.py:全局搜索 Reduce 提示词
  • query/drift_search_local.py:DRIFT 搜索局部提示词
  • query/drift_search_reduce.py:DRIFT 搜索 Reduce 提示词

3.3 自定义提示词

方法 1:使用配置文件

extract_graph:
  prompt_file: "./prompts/custom_entity_extraction.txt"

community_reports:
  prompt_file: "./prompts/custom_community_report.txt"

local_search:
  prompt_file: "./prompts/custom_local_search.txt"

方法 2:直接修改模板文件

# graphrag/prompts/index/entity_extraction.py
ENTITY_EXTRACTION_PROMPT = """
Your custom prompt here...

Entity Types: {entity_types}
Text: {text}
"""

四、分词器(Tokenizer)

4.1 模块概览

Tokenizer 模块提供 Token 计数和文本编码功能,用于:

  • 文本分块大小控制
  • 上下文 Token 预算管理
  • LLM API 参数验证

4.2 支持的分词器

TiktokenTokenizer(推荐):

  • 基于 OpenAI 的 tiktoken
  • 支持 GPT-3.5/GPT-4 系列模型
  • 编码模型:cl100k_basep50k_base

LiteLLMTokenizer

  • 基于 LiteLLM 的 token 计数
  • 支持多种模型的 token 计数
  • 自动根据模型名称选择编码

4.3 核心 API

from graphrag.tokenizer import get_tokenizer

# 获取分词器
tokenizer = get_tokenizer(model_config=model_config)

# 编码文本为 tokens
tokens = tokenizer.encode("Hello, world!")
print(f"Token count: {len(tokens)}")

# 解码 tokens 为文本
text = tokenizer.decode(tokens)

# 直接计数(不返回 tokens)
count = tokenizer.count("Hello, world!")

4.4 配置示例

chunks:
  encoding_model: cl100k_base  # Tiktoken 编码模型

五、向量存储(Vector Stores)

5.1 模块概览

Vector Stores 模块提供向量嵌入的存储和检索功能,支持多种向量数据库。

5.2 支持的向量存储

向量存储 类型 适用场景
LanceDB 本地文件数据库 开发测试、小规模部署
Azure AI Search 云服务 生产环境、大规模部署
Azure Cosmos DB 云服务 全球分发、高可用

5.3 BaseVectorStore 接口

class BaseVectorStore(ABC):
    @abstractmethod
    async def load_documents(
        self,
        documents: list[dict],
    ) -> None:
        """批量加载文档和嵌入向量"""
    
    @abstractmethod
    async def similarity_search(
        self,
        query_embedding: list[float],
        k: int = 10,
        **kwargs,
    ) -> list[dict]:
        """向量相似度搜索"""
    
    @abstractmethod
    async def get_document(
        self,
        document_id: str,
    ) -> dict | None:
        """根据 ID 获取文档"""

5.4 配置示例

LanceDB

vector_store:
  entity_description_embedding:
    type: lancedb
    db_uri: "./lancedb"
    container_name: entity-description
    overwrite: false

Azure AI Search

vector_store:
  entity_description_embedding:
    type: azure_ai_search
    api_key: ${AZURE_SEARCH_API_KEY}
    endpoint: ${AZURE_SEARCH_ENDPOINT}
    index_name: graphrag-entities

Azure Cosmos DB

vector_store:
  entity_description_embedding:
    type: cosmosdb
    connection_string: ${COSMOS_DB_CONNECTION_STRING}
    database_name: graphrag
    container_name: entity-embeddings

5.5 使用示例

写入向量

# 在嵌入生成后自动写入
await embed_text(
    input=entities,
    embed_column="description",
    strategy={
        "vector_store": {
            "type": "lancedb",
            "db_uri": "./lancedb",
            "container_name": "entities",
        }
    },
)

向量检索

from graphrag.vector_stores.factory import create_vector_store

# 创建向量存储实例
vector_store = create_vector_store(
    vector_store_type="lancedb",
    db_uri="./lancedb",
    container_name="entities",
)

# 生成查询嵌入
query_embedding = await embedding_model.embed(["What is AI?"])

# 相似度搜索
results = await vector_store.similarity_search(
    query_embedding=query_embedding[0],
    k=10,
)

# 结果包含:id, title, score, embedding
for result in results:
    print(f"ID: {result['id']}, Score: {result['score']}")

六、工厂模式(Factory)

6.1 模块概览

Factory 模块提供统一的对象创建接口,封装复杂的初始化逻辑。

6.2 核心工厂

PipelineFactory(管道工厂)

# 注册工作流
PipelineFactory.register("my_workflow", my_workflow_function)

# 注册管道
PipelineFactory.register_pipeline("my_method", [
    "workflow1",
    "workflow2",
    "workflow3",
])

# 创建管道
pipeline = PipelineFactory.create_pipeline(config, method="my_method")

StorageFactory(存储工厂)

from graphrag.storage.factory import create_storage_from_config

# 根据配置创建存储
storage = create_storage_from_config(config.storage)

CacheFactory(缓存工厂)

from graphrag.cache.factory import create_cache_from_config

# 根据配置创建缓存
cache = create_cache_from_config(config.cache)

VectorStoreFactory(向量存储工厂)

from graphrag.vector_stores.factory import create_vector_store

# 根据类型创建向量存储
vector_store = create_vector_store(
    vector_store_type="lancedb",
    **vector_store_config,
)

七、日志系统(Logger)

7.1 模块概览

Logger 模块提供统一的日志记录功能,支持多种输出目标。

7.2 日志输出目标

标准输出(Console)

  • 默认日志目标
  • 适合开发和调试

文件(File)

  • 日志写入本地文件
  • 支持日志轮转
  • 路径:{reporting.base_dir}/logs/indexing.log

Blob Storage

  • 日志写入 Azure Blob Storage
  • 适合云端部署

7.3 日志级别

级别 说明 使用场景
DEBUG 调试信息 开发调试
INFO 一般信息 正常运行
WARNING 警告信息 潜在问题
ERROR 错误信息 执行失败

7.4 配置示例

reporting:
  type: file
  base_dir: "./logs"

7.5 使用示例

初始化日志

from graphrag.logger.standard_logging import init_loggers

# 初始化日志系统
init_loggers(
    config=config,
    verbose=True,  # 详细日志
    filename="indexing.log",
)

记录日志

import logging

logger = logging.getLogger(__name__)

logger.info("索引构建开始")
logger.debug("加载配置: %s", config)
logger.warning("缓存未命中: %s", key)
logger.error("LLM 调用失败: %s", error)

7.6 BlobWorkflowLogger

功能:将工作流日志写入 Blob Storage

使用场景

  • 云端部署
  • 需要集中管理日志
  • 长期日志存储

配置

reporting:
  type: blob
  connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
  container_name: graphrag-logs
  base_dir: "project1/logs"

八、模块交互示例

8.1 完整索引构建流程

import asyncio
from graphrag.config import load_config
from graphrag.language_model.manager import ModelManager
from graphrag.cache.factory import create_cache_from_config
from graphrag.storage.factory import create_storage_from_config
from graphrag.logger.standard_logging import init_loggers
from graphrag.api import build_index

async def main():
    # 1. 加载配置
    config = load_config(root_dir="./my_project")
    
    # 2. 初始化日志
    init_loggers(config=config, verbose=True)
    
    # 3. 创建存储和缓存(自动由 API 处理)
    # storage = create_storage_from_config(config.storage)
    # cache = create_cache_from_config(config.cache)
    
    # 4. 构建索引(内部使用 ModelManager 创建 LLM)
    results = await build_index(config=config, method="standard")
    
    print("索引构建完成!")

asyncio.run(main())

8.2 自定义 LLM 调用

from graphrag.language_model.manager import ModelManager
from graphrag.cache.json_pipeline_cache import JsonPipelineCache

# 创建模型管理器
model_manager = ModelManager()

# 创建聊天模型
chat_model = model_manager.get_or_create_chat_model(
    name="my_model",
    model_type="openai_chat",
    config={
        "model": "gpt-4o-mini",
        "api_key": "sk-...",
        "temperature": 0.0,
    },
)

# 创建缓存
cache = JsonPipelineCache(base_dir="./cache")

# 调用模型
response = await chat_model.call(
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "What is AI?"},
    ],
    cache=cache,
)

print(response)

九、最佳实践

9.1 模型选择

索引构建

  • 实体抽取:gpt-4o-mini(平衡质量和成本)
  • 社区报告:gpt-4o-minigpt-4-turbo
  • 嵌入:text-embedding-3-small

查询执行

  • 答案生成:gpt-4-turbogpt-4o(高质量)

9.2 缓存策略

开发阶段

  • 使用 memory 缓存(快速测试)
  • 或使用 json 缓存(持久化,便于调试)

生产阶段

  • 使用 file 缓存(Parquet 格式,高效)
  • 定期清理过期缓存

9.3 向量存储选择

小规模(< 10K 实体)

  • 使用 LanceDB(本地文件,简单快速)

大规模(> 10K 实体)

  • 使用 Azure AI Search(云服务,高性能)
  • 或 Azure Cosmos DB(全球分发)

9.4 日志管理

开发阶段

  • 启用 verbose=True(详细日志)
  • 使用控制台输出

生产阶段

  • 写入文件或 Blob Storage
  • 配置日志轮转
  • 监控 ERROR 级别日志

十、总结

本文档详细介绍了 GraphRAG 的 AI 服务层和支撑模块:

  • Language Model:统一的 LLM 调用接口,支持多种提供商
  • Prompt Tune:自动生成定制化提示词
  • Prompts:预制和自定义提示词模板
  • Tokenizer:Token 计数和文本编码
  • Vector Stores:向量嵌入存储和检索
  • Factory:统一的对象创建接口
  • Logger:灵活的日志记录系统

通过合理配置和使用这些模块,可以实现高效、可靠的知识图谱构建和查询。