MetaGPT-06-工具与RAG系统

一、工具系统

1.1 模块概览

工具系统(Tool System)为Agent提供与外部世界交互的能力,包括文件操作、终端命令、网页爬取、API调用等。MetaGPT采用统一的工具注册机制,支持动态加载和智能推荐。

核心组件

  • ToolRegistry:全局工具注册表,管理所有可用工具
  • @register_tool装饰器:声明式工具注册
  • ToolSchema:工具的schema定义(参数、返回值、描述)
  • ToolRecommender:基于需求推荐合适的工具
  • SkillAction:执行工具调用的动作

职责边界

  • 输入:工具名称、参数字典
  • 输出:执行结果(字符串、JSON、文件等)
  • 上游Role(通过tools参数)、SkillAction
  • 下游:具体工具实现(Terminal、Editor、Browser等)

1.2 架构图

flowchart TB
    subgraph 角色层
        Role[Role with tools]
        Assistant[Assistant角色]
    end
    
    subgraph 动作层
        SkillAction[SkillAction]
        ArgumentParsing[ArgumentsParingAction]
    end
    
    subgraph 注册层
        Registry[ToolRegistry全局注册表]
        Decorator[@register_tool装饰器]
        Recommender[ToolRecommender推荐器]
    end
    
    subgraph 工具层
        Terminal[Terminal终端]
        Editor[Editor编辑器]
        Browser[Browser浏览器]
        WebScraping[WebScraping爬虫]
        Git[Git版本控制]
        DataPreprocess[DataPreprocess数据预处理]
        Others[其他15+工具...]
    end
    
    subgraph Schema层
        ToolSchema[ToolSchema定义]
        ToolConvert[代码->Schema转换]
    end
    
    Role --> Assistant
    Assistant --> SkillAction
    SkillAction --> ArgumentParsing
    ArgumentParsing --> Registry
    Registry --> Recommender
    
    Decorator --> Registry
    Terminal --> Decorator
    Editor --> Decorator
    Browser --> Decorator
    WebScraping --> Decorator
    Git --> Decorator
    DataPreprocess --> Decorator
    Others --> Decorator
    
    Registry --> ToolSchema
    ToolConvert --> ToolSchema

1.3 核心数据结构

Tool 工具类

class Tool(BaseModel):
    name: str  # 工具名称
    path: str  # 工具代码路径
    schemas: dict  # schema定义(参数、返回值)
    code: str  # 源代码
    tags: list[str]  # 标签(用于分类和检索)

ToolSchema 定义

class ToolSchema(BaseModel):
    tool_name: str
    description: str
    parameters: dict  # OpenAI Function Calling格式
    returns: dict  # 返回值类型
    examples: list[dict]  # 使用示例

1.4 关键API

1.4.1 register_tool - 注册工具

装饰器用法

@register_tool(tags=["terminal", "execution"])
class Terminal:
    """终端命令执行工具"""
    
    async def execute(self, command: str) -> str:
        """执行Shell命令"""
        # (此处省略实现)
        pass

手动注册

TOOL_REGISTRY.register_tool(
    tool_name="CustomTool",
    tool_path="custom/path.py",
    schemas={
        "description": "自定义工具",
        "methods": {
            "run": {
                "description": "执行工具",
                "parameters": {"query": "str"}
            }
        }
    },
    tags=["custom"]
)

1.4.2 validate_tool_names - 工具验证

功能:验证并加载工具列表

def validate_tool_names(tools: list[str]) -> dict[str, Tool]:
    # 支持三种方式:
    # 1. 工具名称:"Terminal"
    # 2. 工具标签:"terminal"(加载该标签下所有工具)
    # 3. 文件路径:"custom/tool.py"(动态注册)
    pass

使用示例

# 方式1:指定工具名
tools = validate_tool_names(["Terminal", "Editor"])

# 方式2:指定标签
tools = validate_tool_names(["data_analysis"])  # 加载所有数据分析工具

# 方式3:指定方法
tools = validate_tool_names(["Editor:read,write"])  # 仅加载Editor的read和write方法

# 方式4:混合使用
tools = validate_tool_names([
    "Terminal",
    "data_analysis",
    "Editor:read",
    "custom/my_tool.py"
])

1.4.3 SkillAction - 工具执行

核心代码

class SkillAction(Action):
    skill: Skill
    args: Dict
    
    async def run(self, **kwargs) -> Message:
        # 1) 查找并调用工具函数
        rsp = await self.find_and_call_function(
            self.skill.name,
            args=self.args,
            **kwargs
        )
        
        # 2) 封装返回消息
        return Message(content=rsp, role="assistant", cause_by=self)
    
    async def find_and_call_function(self, skill_name: str, args: dict, **kwargs):
        # 1) 动态导入模块
        module_path, func_name = self._parse_skill_path(skill_name)
        module = importlib.import_module(module_path)
        func = getattr(module, func_name)
        
        # 2) 调用函数
        if asyncio.iscoroutinefunction(func):
            result = await func(**args, **kwargs)
        else:
            result = func(**args, **kwargs)
        
        return str(result)

调用流程

sequenceDiagram
    autonumber
    participant Role
    participant SkillAction
    participant ArgumentParsing
    participant Tool
    
    Role->>ArgumentParsing: 解析用户意图
    ArgumentParsing->>ArgumentParsing: LLM提取参数
    ArgumentParsing-->>Role: 返回参数dict
    
    Role->>SkillAction: run(args)
    SkillAction->>SkillAction: find_and_call_function
    SkillAction->>Tool: 调用工具方法
    Tool-->>SkillAction: 返回结果
    SkillAction-->>Role: Message(content=结果)

1.5 内置工具清单

工具名 标签 主要功能 典型使用场景
Terminal terminal 执行Shell命令 运行脚本、环境配置
Editor file 文件CRUD操作 代码编辑、文件管理
Browser web 网页浏览和交互 UI测试、数据采集
WebScraping web 网页内容爬取 信息提取、监控
Git version_control Git操作 代码提交、分支管理
DataPreprocess data_analysis 数据预处理 清洗、转换、标准化
FeatureEngineering data_analysis 特征工程 特征提取、选择
SDEngine image_generation Stable Diffusion AI绘画
EmailLogin communication 邮件收发 邮件通知、自动化
CRTool code_review 代码审查 质量检查、静态分析
Deployer deployment 部署服务 发布、回滚

1.6 工具推荐机制

ToolRecommender基类

class ToolRecommender(BaseModel):
    tools: dict[str, Tool] = {}
    
    async def recommend_tools(self, task: str, topk: int = 5) -> list[Tool]:
        # 子类实现具体推荐策略
        pass

三种推荐策略

  1. TypeMatchToolRecommender:基于类型匹配
# 根据任务关键词匹配工具标签
task = "爬取网页数据"
# 推荐:WebScraping, Browser(包含"web"标签)
  1. BM25ToolRecommender:基于TF-IDF
# 将task和tool descriptions进行BM25相似度计算
# 返回相似度最高的top-k工具
  1. EmbeddingToolRecommender:基于语义向量
# 使用Embedding模型计算task和tool的语义相似度
# 支持模糊匹配和跨语言

使用示例

recommender = EmbeddingToolRecommender(tools=all_tools)
recommended = await recommender.recommend_tools(
    task="我需要下载网页并提取其中的表格数据",
    topk=3
)
# 推荐:WebScraping, DataPreprocess, Editor

1.7 最佳实践

1.7.1 自定义工具

from metagpt.tools.tool_registry import register_tool

@register_tool(tags=["custom", "database"])
class DatabaseQuery:
    """数据库查询工具"""
    
    def __init__(self, connection_string: str):
        self.conn = self._create_connection(connection_string)
    
    async def query(self, sql: str) -> list[dict]:
        """执行SQL查询
        
        Args:
            sql: SQL查询语句
            
        Returns:
            查询结果列表
        """
        cursor = await self.conn.execute(sql)
        results = await cursor.fetchall()
        return [dict(row) for row in results]
    
    async def insert(self, table: str, data: dict) -> int:
        """插入数据
        
        Args:
            table: 表名
            data: 数据字典
            
        Returns:
            受影响的行数
        """
        # (此处省略实现)
        pass

1.7.2 在Role中使用工具

from metagpt.roles import Role
from metagpt.actions.skill_action import SkillAction

class DataAnalystRole(Role):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
        # 声明需要的工具
        self.tools = [
            "DataPreprocess",
            "FeatureEngineering",
            "Terminal:execute"
        ]
        
        # 工具会自动加载和验证
        self._init_actions([SkillAction])
    
    async def _think(self) -> bool:
        # 根据任务推荐工具
        task = self.rc.important_memory[0].content
        recommended_tools = await self.recommend_tools(task)
        
        # 选择工具并设置参数
        self.rc.todo = SkillAction(
            skill=recommended_tools[0],
            args={"data_path": "data.csv"}
        )
        return True

二、RAG系统

2.1 模块概览

RAG(Retrieval-Augmented Generation)系统为MetaGPT提供知识检索能力,支持从文档库中检索相关信息以增强LLM的生成质量。

核心组件

  • DocumentStore:文档存储和索引
  • Retriever:检索器(BM25、Faiss、Chroma等)
  • SimpleEngine:简化的RAG引擎
  • Embedding:向量化模型

技术栈

  • 向量数据库:Faiss、Chroma、Elasticsearch
  • Embedding模型:OpenAI ada-002、本地模型(BGE、m3e)
  • 索引引擎:LlamaIndex框架

2.2 架构图

flowchart TB
    subgraph 数据摄入层
        Documents[原始文档]
        Chunker[文本分块]
        Embedding[向量化]
    end
    
    subgraph 存储层
        VectorDB[向量数据库<br/>Faiss/Chroma]
        DocStore[文档存储<br/>JSON/SQLite]
        IndexStore[索引元数据]
    end
    
    subgraph 检索层
        Retriever[Retriever检索器]
        BM25Ret[BM25Retriever]
        FaissRet[FAISSRetriever]
        HybridRet[HybridRetriever]
    end
    
    subgraph 引擎层
        SimpleEngine[SimpleEngine]
        QueryEngine[QueryEngine]
    end
    
    subgraph 应用层
        RAGAction[RAG相关Action]
        SearchEnhancedQA[SearchEnhancedQA]
    end
    
    Documents --> Chunker
    Chunker --> Embedding
    Embedding --> VectorDB
    Chunker --> DocStore
    Embedding --> IndexStore
    
    VectorDB --> FaissRet
    DocStore --> Retriever
    IndexStore --> Retriever
    
    BM25Ret --> SimpleEngine
    FaissRet --> SimpleEngine
    HybridRet --> SimpleEngine
    
    SimpleEngine --> QueryEngine
    QueryEngine --> RAGAction
    RAGAction --> SearchEnhancedQA

2.3 核心数据结构

Document 文档类

class Document(BaseModel):
    content: str  # 文档内容
    metadata: dict  # 元数据(来源、时间戳等)
    embedding: Optional[list[float]] = None  # 向量

IndexConfig 索引配置

class FAISSIndexConfig(BaseModel):
    persist_path: Path  # 持久化路径
    dimension: int = 768  # 向量维度
    index_type: str = "flat"  # flat/ivf/hnsw

2.4 关键API

2.4.1 SimpleEngine.from_docs - 构建索引

from metagpt.rag.engines.simple import SimpleEngine
from metagpt.rag.schema import FAISSRetrieverConfig

# 从文档列表构建索引
docs = [
    Document(content="MetaGPT是一个多智能体框架"),
    Document(content="MetaGPT支持角色定制"),
]

engine = SimpleEngine.from_docs(
    docs=docs,
    retriever_configs=[FAISSRetrieverConfig(top_k=3)]
)

2.4.2 SimpleEngine.aretrieve - 检索

# 检索相关文档
results = await engine.aretrieve("如何定制角色?")

for item in results:
    print(f"Score: {item.score}")
    print(f"Content: {item.node.text}")

2.4.3 DocumentStore - 文档管理

from metagpt.document_store import ChromaStore

# 初始化文档库
store = ChromaStore("project_docs")

# 添加文档
await store.add(Document(content="文档内容"))

# 搜索
results = await store.search("查询关键词", top_k=5)

# 删除
await store.delete(doc_ids=["doc_id_1"])

2.5 最佳实践

2.5.1 文档分块策略

from metagpt.rag.chunker import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,  # 块大小(字符数)
    chunk_overlap=50,  # 重叠大小
    separators=["\n\n", "\n", "。", "!", "?"]
)

chunks = splitter.split_text(long_document)

2.5.2 混合检索

from metagpt.rag.retrievers import HybridRetriever

# 结合BM25和向量检索
retriever = HybridRetriever(
    bm25_weight=0.3,
    vector_weight=0.7,
    top_k=10
)

results = await retriever.retrieve(query)

2.5.3 在Action中使用RAG

from metagpt.actions import Action

class RAGEnhancedAction(Action):
    def __init__(self, knowledge_base: SimpleEngine, **kwargs):
        super().__init__(**kwargs)
        self.kb = knowledge_base
    
    async def run(self, query: str) -> Message:
        # 1) 检索相关文档
        docs = await self.kb.aretrieve(query, top_k=3)
        context = "\n".join([doc.node.text for doc in docs])
        
        # 2) 构造增强提示词
        prompt = f"""参考以下文档回答问题:

{context}

问题:{query}

回答:"""
        
        # 3) 调用LLM
        response = await self._aask(prompt)
        return Message(content=response)

三、策略与学习系统

3.1 模块概览

策略系统(Strategy)和经验池(ExpPool)为MetaGPT提供高级推理能力和持续学习机制。

核心组件

  • TreeofThought (ToT):树状思维推理
  • MCTSSolver:蒙特卡洛树搜索求解器
  • ExpPool:经验池,存储成功/失败案例
  • Scorer:经验评分器

3.2 ToT 树状思维

原理:通过树搜索探索多条推理路径,选择最优解。

from metagpt.strategy.tot import TreeofThought, MCTSSolver

tot = TreeofThought(
    solver=MCTSSolver(
        max_iterations=10,
        exploration_weight=1.4
    )
)

solution = await tot.solve(
    problem="设计一个分布式缓存系统",
    initial_prompt="首先分析需求"
)

搜索树可视化

graph TD
    Root[问题:设计缓存系统] --> A1[方案A:Redis集群]
    Root --> A2[方案B:Memcached]
    Root --> A3[方案C:自研]
    
    A1 --> B1[数据分片策略]
    A1 --> B2[一致性哈希]
    B2 --> C1[虚拟节点数量]
    B2 --> C2[故障转移]
    
    style C2 fill:#9f9

3.3 ExpPool 经验池

功能:存储和检索成功的执行案例。

from metagpt.exp_pool.manager import ExpPoolManager

# 初始化经验池
pool = ExpPoolManager(path="exp_pool.json")

# 添加经验
pool.add_experience(
    task="生成单元测试",
    solution="使用pytest框架...",
    score=0.95
)

# 检索相似经验
similar_exps = pool.retrieve(
    query="如何生成测试用例",
    top_k=3
)

评分器

from metagpt.exp_pool.scorer import Scorer

scorer = Scorer()
score = await scorer.score(
    task="编写Dockerfile",
    solution=dockerfile_content
)
# 返回0-1之间的分数

3.4 最佳实践

使用ToT解决复杂问题

class ComplexProblemSolver(Role):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.tot = TreeofThought()
    
    async def _act(self) -> Message:
        problem = self.rc.memory.get()[-1].content
        
        # 使用ToT求解
        solution = await self.tot.solve(
            problem=problem,
            max_depth=5,
            num_candidates=3
        )
        
        return Message(content=solution)