1. Role 执行循环时序图

1.1 核心执行流程

Role类的执行循环是MetaGPT智能体系统的核心机制,实现了"观察-思考-行动"的智能体范式。

关键函数:Role.run()

位置: metagpt/roles/role.py:530
功能: 角色主执行入口,实现完整的智能体执行周期

@role_raise_decorator
async def run(self, with_message=None) -> Message | None:
    """观察,思考并根据观察结果行动
    
    Args:
        with_message: 外部输入消息,可以是字符串、Message对象或消息列表
        
    Returns:
        Message | None: 执行结果消息,如果无新消息则返回None
        
    执行流程:
        1. 预处理输入消息并放入消息缓冲区
        2. 观察环境中的新消息
        3. 如果有新消息,进行反应(思考+行动)
        4. 重置状态并发布响应消息
    """
    # 1. 消息预处理:将外部输入转换为标准Message格式
    if with_message:
        msg = None
        if isinstance(with_message, str):
            msg = Message(content=with_message)
        elif isinstance(with_message, Message):
            msg = with_message
        elif isinstance(with_message, list):
            msg = Message(content="\n".join(with_message))
        if not msg.cause_by:
            msg.cause_by = UserRequirement  # 设置消息来源
        self.put_message(msg)  # 放入私有消息缓冲区
    
    # 2. 观察阶段:检查是否有新消息需要处理
    if not await self._observe():
        logger.debug(f"{self._setting}: no news. waiting.")
        return  # 无新消息时暂停等待

    # 3. 反应阶段:根据观察结果进行思考和行动
    rsp = await self.react()

    # 4. 状态重置:清理当前任务状态
    self.set_todo(None)
    
    # 5. 消息发布:将响应发送到环境中
    self.publish_message(rsp)
    return rsp

关键函数:Role._observe()

位置: metagpt/roles/role.py:399
功能: 消息观察和过滤机制

async def _observe(self) -> int:
    """从消息缓冲区准备新消息进行处理
    
    Returns:
        int: 新消息数量
        
    处理逻辑:
        1. 从消息缓冲区获取未处理消息
        2. 与历史记忆对比,过滤重复消息
        3. 根据关注列表过滤感兴趣的消息
        4. 更新角色记忆系统
    """
    # 获取新消息
    news = []
    if self.recovered and self.latest_observed_msg:
        # 恢复模式:从上次观察的消息开始
        news = self.rc.memory.find_news(observed=[self.latest_observed_msg], k=10)
    if not news:
        # 正常模式:从缓冲区获取所有消息
        news = self.rc.msg_buffer.pop_all()
    
    # 获取历史消息用于去重
    old_messages = [] if not self.enable_memory else self.rc.memory.get()
    
    # 过滤感兴趣的消息
    self.rc.news = [
        n for n in news 
        if (n.cause_by in self.rc.watch or self.name in n.send_to) 
        and n not in old_messages
    ]
    
    # 更新记忆系统
    if self.observe_all_msg_from_buffer:
        self.rc.memory.add_batch(news)  # 保存所有消息
    else:
        self.rc.memory.add_batch(self.rc.news)  # 只保存感兴趣的消息
    
    # 记录最后观察的消息用于恢复
    self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None
    return len(self.rc.news)

关键函数:Role.react()

位置: metagpt/roles/role.py:512
功能: 反应策略调度器

async def react(self) -> Message:
    """根据反应模式选择执行策略
    
    Returns:
        Message: 反应结果消息
        
    支持的反应模式:
        - REACT: 动态选择动作
        - BY_ORDER: 按顺序执行动作
        - PLAN_AND_ACT: 先计划后执行
    """
    if self.rc.react_mode == RoleReactMode.REACT or self.rc.react_mode == RoleReactMode.BY_ORDER:
        rsp = await self._react()
    elif self.rc.react_mode == RoleReactMode.PLAN_AND_ACT:
        rsp = await self._plan_and_act()
    else:
        raise ValueError(f"Unsupported react mode: {self.rc.react_mode}")
    
    # 重置状态:当前反应完成,状态回到初始值
    self._set_state(state=-1)
    
    # 添加智能体信息
    if isinstance(rsp, AIMessage):
        rsp.with_agent(self._setting)
    return rsp
Mermaid Chart MetaGPT_核心模块时序图-0

1.2 调用链分析

Environment.run()
    └── Role.run()
        ├── Role._observe()
        │   ├── MessageQueue.pop_all()
        │   ├── Memory.find_news()
        │   └── Memory.add_batch()
        ├── Role.react()
        │   └── Role._react() / Role._plan_and_act()
        │       ├── Role._think()
        │       │   └── LLM.aask() [条件性调用]
        │       └── Role._act()
        │           └── Action.run()
        │               └── LLM.aask()
        └── Environment.publish_message()

2. ActionNode 填充时序图

2.1 结构化输出处理机制

ActionNode是MetaGPT实现结构化输出的核心组件,它将自然语言提示转换为结构化数据。

关键函数:ActionNode.fill()

位置: metagpt/actions/action_node.py:597
功能: 节点填充的核心方法,实现结构化输出处理

@exp_cache(serializer=ActionNodeSerializer())
async def fill(
    self,
    *,
    req,                    # 请求上下文,包含所有必要信息
    llm,                    # 大语言模型实例
    schema="json",          # 输出格式:raw/json/markdown
    mode="auto",            # 填充模式:auto/children/root
    strgy="simple",         # 策略:simple/complex
    images=None,            # 图像输入(多模态支持)
    timeout=USE_CONFIG_TIMEOUT,  # 超时设置
    exclude=[],             # 排除的字段
    function_name=None,     # 函数名(代码填充模式)
):
    """填充节点内容的主要方法
    
    Args:
        req: 填充节点时需要的所有上下文信息
        llm: 预定义系统消息的大语言模型
        schema: 输出格式 - raw(自由文本)/json(结构化)/markdown(标记)
        mode: 填充模式 - auto(自动)/children(子节点)/root(根节点)
        strgy: 执行策略 - simple(单次运行)/complex(逐节点运行)
        images: 图像URL或base64列表(GPT-4V支持)
        timeout: LLM调用超时时间
        exclude: 要排除的ActionNode键列表
        
    Returns:
        self: 填充后的ActionNode实例
    """
    # 1. 初始化设置
    self.set_llm(llm)           # 设置LLM实例
    self.set_context(req)       # 设置上下文
    if self.schema:
        schema = self.schema    # 使用节点预定义的schema
    
    # 2. 特殊填充模式处理
    if mode == FillMode.CODE_FILL.value:
        # 代码填充模式:专门处理代码生成
        result = await self.code_fill(context, function_name, timeout)
        self.instruct_content = self.create_class()(**result)
        return self
    
    elif mode == FillMode.XML_FILL.value:
        # XML填充模式:处理XML格式输出
        context = self.xml_compile(context=self.context)
        result = await self.xml_fill(context, images=images)
        self.instruct_content = self.create_class()(**result)
        return self
    
    elif mode == FillMode.SINGLE_FILL.value:
        # 单一填充模式:处理单个字段
        result = await self.single_fill(context, images=images)
        self.instruct_content = self.create_class()(**result)
        return self
    
    # 3. 标准填充流程
    if strgy == "simple":
        # 简单策略:一次性处理所有字段
        return await self.simple_fill(
            schema=schema, mode=mode, images=images, 
            timeout=timeout, exclude=exclude
        )
    elif strgy == "complex":
        # 复杂策略:逐个处理子节点
        tmp = {}
        for _, child_node in self.children.items():
            if exclude and child_node.key in exclude:
                continue  # 跳过排除的字段
            
            # 递归填充子节点
            child = await child_node.simple_fill(
                schema=schema, mode=mode, images=images, 
                timeout=timeout, exclude=exclude
            )
            tmp.update(child.instruct_content.model_dump())
        
        # 创建组合类并实例化
        cls = self._create_children_class()
        self.instruct_content = cls(**tmp)
        return self

关键函数:ActionNode.compile()

位置: metagpt/actions/action_node.py:382
功能: 提示模板编译器

def compile(self, context, schema="json", mode="children", template=SIMPLE_TEMPLATE, exclude=[]) -> str:
    """将节点定义编译为LLM提示模板
    
    Args:
        context: 上下文信息
        schema: 输出格式 - raw/json/markdown
        mode: 编译模式 - all/root/children
        template: 提示模板
        exclude: 排除的字段列表
        
    Returns:
        str: 编译后的提示字符串
        
    模板组成:
        - context: 历史消息和当前请求
        - instruction: 字段定义和类型约束
        - example: 期望的输出格式示例
        - constraint: 语言和格式约束
    """
    if schema == "raw":
        # 原始模式:自由文本输出
        return f"{context}\n\n## Actions\n{LANGUAGE_CONSTRAINT}\n{self.instruction}"

    # 编译各个组件
    instruction = self.compile_instruction(schema="markdown", mode=mode, exclude=exclude)
    example = self.compile_example(schema=schema, tag=TAG, mode=mode, exclude=exclude)
    constraints = [LANGUAGE_CONSTRAINT, FORMAT_CONSTRAINT]
    constraint = "\n".join(constraints)

    # 组装完整提示
    prompt = template.format(
        context=context,
        example=example,
        instruction=instruction,
        constraint=constraint,
    )
    return prompt

关键函数:Action._run_action_node()

位置: metagpt/actions/action.py:103
功能: Action与ActionNode的桥接方法

async def _run_action_node(self, *args, **kwargs):
    """运行动作节点
    
    Args:
        args[0]: 历史消息列表
        
    Returns:
        ActionNode: 填充后的节点实例
        
    处理流程:
        1. 构建历史消息上下文
        2. 调用节点填充方法
        3. 返回结构化结果
    """
    msgs = args[0]
    # 构建上下文:将历史消息格式化
    context = "## History Messages\n"
    context += "\n".join([f"{idx}: {i}" for idx, i in enumerate(reversed(msgs))])
    
    # 调用节点填充
    return await self.node.fill(req=context, llm=self.llm)
Mermaid Chart MetaGPT_核心模块时序图-1

2.2 调用链分析

Action.run()
    └── Action._run_action_node()
        └── ActionNode.fill()
            ├── ActionNode.set_llm()
            ├── ActionNode.set_context()
            ├── ActionNode.compile()
            │   ├── ActionNode.compile_instruction()
            │   ├── ActionNode.compile_example()
            │   └── ActionNode.get_mapping()
            ├── ActionNode._aask_v1()
            │   └── LLM.aask()
            ├── OutputParser.parse_data_with_mapping()
            ├── ActionNode.create_model_class()
            └── PydanticModel.__init__()

3. Environment 消息路由时序图

3.1 消息路由机制

Environment类是MetaGPT中智能体间通信的核心基础设施,负责消息的路由分发和环境管理。

关键函数:Environment.publish_message()

位置: metagpt/environment/base_env.py:175
功能: 消息路由分发的核心方法

def publish_message(self, message: Message, peekable: bool = True) -> bool:
    """将消息分发给接收者
    
    根据RFC 116第2.2.1章节的消息路由结构设计,以及RFC 113中整个系统的规划,
    消息中的路由信息只负责指定消息接收者,不关心消息接收者在哪里。
    如何将消息路由到消息接收者是RFC 113设计的传输框架要解决的问题。
    
    Args:
        message: 要发布的消息对象
        peekable: 是否可窥视(调试用)
        
    Returns:
        bool: 发布是否成功
        
    路由算法:
        1. 遍历所有角色及其地址
        2. 检查消息的send_to是否匹配角色地址
        3. 将消息投递到匹配的角色消息缓冲区
        4. 记录消息到历史中用于调试
    """
    logger.debug(f"publish_message: {message.dump()}")
    found = False
    
    # 根据RFC 113第2.2.3.2章节的路由功能计划
    for role, addrs in self.member_addrs.items():
        if is_send_to(message, addrs):
            # 将消息放入角色的私有消息缓冲区
            role.put_message(message)
            found = True
    
    if not found:
        # 记录无接收者的消息警告
        logger.warning(f"Message no recipients: {message.dump()}")
    
    # 添加到历史记录用于调试
    self.history.add(message)
    return True

关键函数:Environment.run()

位置: metagpt/environment/base_env.py:197
功能: 环境运行引擎,并发执行所有角色

async def run(self, k=1):
    """处理一次所有角色的运行
    
    Args:
        k: 运行轮数,默认为1
        
    执行机制:
        1. 收集所有非空闲角色的执行任务
        2. 使用asyncio.gather()并发执行所有角色
        3. 记录环境空闲状态用于调试
        
    并发设计:
        - 跳过空闲角色,避免不必要的等待
        - 并发执行提高系统整体性能
        - 异常隔离,单个角色错误不影响其他角色
    """
    for _ in range(k):
        futures = []
        
        # 收集所有活跃角色的执行任务
        for role in self.roles.values():
            if role.is_idle:
                continue  # 跳过空闲角色
            future = role.run()
            futures.append(future)

        # 并发执行所有角色
        if futures:
            await asyncio.gather(*futures)
        
        logger.debug(f"is idle: {self.is_idle}")

关键函数:Environment.add_roles()

位置: metagpt/environment/base_env.py:164
功能: 角色管理和环境绑定

def add_roles(self, roles: Iterable[BaseRole]):
    """增加一批角色到当前环境
    
    Args:
        roles: 要添加的角色列表
        
    处理流程:
        1. 将角色添加到环境的角色字典中
        2. 设置角色的环境引用
        3. 建立角色地址映射关系
        
    设计要点:
        - 双向绑定:环境持有角色引用,角色持有环境引用
        - 地址管理:维护角色名称到地址的映射
        - 上下文共享:角色继承环境的上下文配置
    """
    for role in roles:
        # 添加到角色字典,使用名称作为键
        self.roles[role.name] = role
        
        # 建立地址映射关系
        self.member_addrs[role] = {role.name}

    # 设置角色环境和上下文
    for role in roles:
        role.context = self.context  # 共享上下文配置
        role.set_env(self)           # 设置环境引用
Mermaid Chart MetaGPT_核心模块时序图-2

3.2 调用链分析

Environment.run()
    ├── Role.is_idle [检查]
    ├── Role.run() [并发收集]
    └── asyncio.gather(*futures) [并发执行]

Environment.publish_message()
    ├── is_send_to(message, addrs) [地址匹配]
    ├── Role.put_message(message) [消息投递]
    └── History.add(message) [历史记录]

Environment.add_roles()
    ├── roles[role.name] = role [角色注册]
    ├── member_addrs[role] = {role.name} [地址映射]
    └── role.set_env(self) [环境绑定]

4. Team 运行时序图

4.1 团队协作机制

Team类是MetaGPT中多智能体协作的顶层管理器,负责协调各个角色的执行和资源管理。

关键函数:Team.run()

位置: metagpt/team.py:123
功能: 团队主执行循环,控制整个协作流程

@serialize_decorator
async def run(self, n_round=3, idea="", send_to="", auto_archive=True):
    """运行团队直到目标轮数或资金耗尽
    
    Args:
        n_round: 最大运行轮数
        idea: 初始想法或需求
        send_to: 消息发送目标
        auto_archive: 是否自动归档
        
    Returns:
        History: 执行历史记录
        
    执行流程:
        1. 发布初始需求到环境
        2. 循环执行直到达到轮数限制或所有角色空闲
        3. 每轮检查预算并执行环境运行
        4. 自动归档项目结果
    """
    if idea:
        self.run_project(idea=idea, send_to=send_to)

    while n_round > 0:
        if self.env.is_idle:  # 所有角色都空闲
            logger.debug("All roles are idle.")
            break
        n_round -= 1
        self._check_balance()  # 检查预算
        await self.env.run()   # 环境运行一轮
        logger.debug(f"max {n_round=} left.")
    
    self.env.archive(auto_archive)  # 归档项目
    return self.env.history

关键函数:Team._check_balance()

位置: metagpt/team.py:95
功能: 预算控制机制

def _check_balance(self):
    """检查预算余额,防止成本失控
    
    Raises:
        NoMoneyException: 当总成本超过预算限制时抛出
        
    成本控制策略:
        - 实时监控LLM调用成本
        - 设置预算上限防止无限制消费
        - 提供详细的成本报告
    """
    if self.cost_manager.total_cost >= self.cost_manager.max_budget:
        raise NoMoneyException(
            self.cost_manager.total_cost, 
            f"资金不足: {self.cost_manager.max_budget}"
        )
sequenceDiagram
    participant CLI as 命令行
    participant Team as Team
    participant Env as Environment
    participant Role1 as ProductManager
    participant Role2 as Architect
    participant Role3 as Engineer
    participant CostMgr as CostManager
    
    CLI->>Team: run(n_round, idea)
    Team->>Env: publish_message(Message(idea))
    
    loop n_round 轮
        Team->>CostMgr: _check_balance()
        alt 预算充足
            Team->>Env: run()
            
            par 并发执行角色
                Env->>Role1: run()
                Role1->>Role1: observe->think->act
                Role1->>Env: publish_message(prd)
            and
                Env->>Role2: run()
                Role2->>Role2: observe->think->act  
                Role2->>Env: publish_message(design)
            and
                Env->>Role3: run()
                Role3->>Role3: observe->think->act
                Role3->>Env: publish_message(code)
            end
            
            Team->>Env: is_idle?
            alt 所有角色空闲
                break 提前结束
            end
        else 预算不足
            Team->>Team: raise NoMoneyException
        end
    end
    
    Team->>Env: archive(auto_archive)
    Team-->>CLI: env.history