核心执行路径函数深度解析

1. 启动入口函数

generate_repo() - 主启动函数

位置: metagpt/software_company.py:14 职责: 协调整个软件开发流程的启动

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def generate_repo(
    idea,
    investment=3.0,
    n_round=5,
    code_review=True,
    run_tests=False,
    implement=True,
    project_name="",
    inc=False,
    project_path="",
    reqa_file="",
    max_auto_summarize_code=0,
    recover_path=None,
):

关键步骤:

  1. 配置初始化: config.update_via_cli() - 解析命令行参数并更新配置
  2. 上下文创建: Context(config=config) - 创建全局上下文对象
  3. 团队组建: Team(context=ctx) - 初始化团队实例
  4. 角色雇佣: company.hire([...]) - 添加各种专业角色
  5. 投资设置: company.invest(investment) - 设置预算限制
  6. 异步执行: asyncio.run(company.run(...)) - 启动主执行循环

设计亮点:

  • 支持从序列化状态恢复: Team.deserialize(stg_path)
  • 灵活的角色配置: 可选择性启用不同角色
  • 预算控制机制: 防止无限制的LLM调用成本

2. 团队管理函数

Team.run() - 团队主执行循环

位置: metagpt/team.py:123 职责: 控制整个团队的运行节奏和生命周期

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@serialize_decorator
async def run(self, n_round=3, idea="", send_to="", auto_archive=True):
    """Run company until target round or no money"""
    if idea:
        self.run_project(idea=idea, send_to=send_to)

    while n_round > 0:
        if self.env.is_idle:
            logger.debug("All roles are idle.")
            break
        n_round -= 1
        self._check_balance()
        await self.env.run()
        logger.debug(f"max {n_round=} left.")
    
    self.env.archive(auto_archive)
    return self.env.history

执行逻辑:

  1. 需求发布: 如果有新想法,通过run_project()发布到环境
  2. 轮次控制: 最多执行n_round轮,防止无限循环
  3. 空闲检测: 所有角色都空闲时提前结束
  4. 预算检查: 每轮检查是否超出预算限制
  5. 环境运行: 委托给环境执行一轮所有角色
  6. 自动归档: 完成后自动保存项目状态

关键机制:

  • @serialize_decorator: 自动序列化团队状态
  • 预算控制: _check_balance() 防止成本失控
  • 优雅退出: 多种退出条件确保程序正常结束

Team.hire() - 角色雇佣

位置: metagpt/team.py:83

1
2
3
def hire(self, roles: list[Role]):
    """Hire roles to cooperate"""
    self.env.add_roles(roles)

简洁设计: 直接委托给环境的add_roles()方法,体现了单一职责原则。


3. 环境管理函数

Environment.run() - 环境执行引擎

位置: metagpt/environment/base_env.py:197 职责: 并发执行所有活跃角色

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
async def run(self, k=1):
    """处理一次所有信息的运行
    Process all Role runs at once
    """
    for _ in range(k):
        futures = []
        for role in self.roles.values():
            if role.is_idle:
                continue
            future = role.run()
            futures.append(future)

        if futures:
            await asyncio.gather(*futures)
        logger.debug(f"is idle: {self.is_idle}")

并发设计:

  • 异步收集: 收集所有非空闲角色的执行任务
  • 并发执行: 使用asyncio.gather()并发执行所有角色
  • 性能优化: 跳过空闲角色,避免不必要的等待

Environment.publish_message() - 消息路由核心

位置: metagpt/environment/base_env.py:175 职责: 实现消息的智能路由分发

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def publish_message(self, message: Message, peekable: bool = True) -> bool:
    """
    Distribute the message to the recipients.
    """
    logger.debug(f"publish_message: {message.dump()}")
    found = False
    # 根据 RFC 113 的路由特性计划
    for role, addrs in self.member_addrs.items():
        if is_send_to(message, addrs):
            role.put_message(message)
            found = True
    if not found:
        logger.warning(f"Message no recipients: {message.dump()}")
    self.history.add(message)  # For debug
    return True

路由算法:

  1. 地址匹配: 检查消息的send_to是否匹配角色的订阅地址
  2. 消息投递: 将消息放入匹配角色的私有消息缓冲区
  3. 异常处理: 记录无接收者的消息警告
  4. 历史记录: 保存所有消息用于调试和回溯

设计特点:

  • 基于RFC 113的路由设计,职责清晰
  • 支持一对多广播和点对点通信
  • 完整的消息追踪和调试支持

4. 角色执行函数

Role.run() - 角色主执行入口

位置: metagpt/roles/role.py:530 职责: 实现角色的完整执行周期

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@role_raise_decorator
async def run(self, with_message=None) -> Message | None:
    """Observe, and think and act based on the results of the observation"""
    if with_message:
        msg = self._process_input_message(with_message)
        self.put_message(msg)
    
    if not await self._observe():
        # If there is no new information, suspend and wait
        logger.debug(f"{self._setting}: no news. waiting.")
        return

    rsp = await self.react()

    # Reset the next action to be taken.
    self.set_todo(None)
    # Send the response message to the Environment object
    self.publish_message(rsp)
    return rsp

执行流程:

  1. 消息预处理: 处理外部输入消息
  2. 观察阶段: _observe() 检查新消息
  3. 反应阶段: react() 思考并执行动作
  4. 状态重置: 清理当前任务状态
  5. 结果发布: 将响应发布到环境

Role._observe() - 消息观察机制

位置: metagpt/roles/role.py:399 职责: 从消息缓冲区筛选和处理新消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
async def _observe(self) -> int:
    """Prepare new messages for processing from the message buffer and other sources."""
    # Read unprocessed messages from the msg buffer.
    news = []
    if self.recovered and self.latest_observed_msg:
        news = self.rc.memory.find_news(observed=[self.latest_observed_msg], k=10)
    if not news:
        news = self.rc.msg_buffer.pop_all()
    
    # Store the read messages in your own memory to prevent duplicate processing.
    old_messages = [] if not self.enable_memory else self.rc.memory.get()
    
    # Filter in messages of interest.
    self.rc.news = [
        n for n in news 
        if (n.cause_by in self.rc.watch or self.name in n.send_to) 
        and n not in old_messages
    ]
    
    if self.observe_all_msg_from_buffer:
        self.rc.memory.add_batch(news)
    else:
        self.rc.memory.add_batch(self.rc.news)
    
    self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None
    return len(self.rc.news)

过滤逻辑:

  1. 消息获取: 从缓冲区获取所有未处理消息
  2. 重复检测: 与已有记忆对比,避免重复处理
  3. 兴趣过滤: 只关注订阅的动作类型或直接发送给自己的消息
  4. 记忆更新: 将新消息添加到记忆系统
  5. 状态跟踪: 记录最后观察到的消息用于恢复

Role._think() - 决策思考机制

位置: metagpt/roles/role.py:340 职责: 根据当前状态和历史决定下一步动作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async def _think(self) -> bool:
    """Consider what to do and decide on the next course of action."""
    if len(self.actions) == 1:
        # If there is only one action, then only this one can be performed
        self._set_state(0)
        return True

    if self.recovered and self.rc.state >= 0:
        self._set_state(self.rc.state)  # action to run from recovered state
        self.recovered = False
        return True

    if self.rc.react_mode == RoleReactMode.BY_ORDER:
        if self.rc.max_react_loop != len(self.actions):
            self.rc.max_react_loop = len(self.actions)
        self._set_state(self.rc.state + 1)
        return self.rc.state >= 0 and self.rc.state < len(self.actions)

    # 使用LLM动态选择动作
    prompt = self._get_prefix() + STATE_TEMPLATE.format(
        history=self.rc.history,
        states="\n".join(self.states),
        n_states=len(self.states) - 1,
        previous_state=self.rc.state,
    )

    next_state = await self.llm.aask(prompt)
    next_state = extract_state_value_from_output(next_state)
    
    if (not next_state.isdigit() and next_state != "-1") or int(next_state) not in range(-1, len(self.states)):
        logger.warning(f"Invalid answer of state, {next_state=}, will be set to -1")
        next_state = -1
    else:
        next_state = int(next_state)
        if next_state == -1:
            logger.info(f"End actions with {next_state=}")
    
    self._set_state(next_state)
    return True

决策策略:

  1. 单动作模式: 只有一个动作时直接执行
  2. 恢复模式: 从序列化状态恢复时继续之前的动作
  3. 顺序模式: 按预定义顺序执行动作
  4. 智能模式: 使用LLM根据历史和当前状态选择最佳动作

状态管理:

  • 状态验证确保选择的动作索引有效
  • 支持-1状态表示任务完成
  • 完善的错误处理和日志记录

Role._act() - 动作执行机制

位置: metagpt/roles/role.py:381 职责: 执行当前选定的动作并生成响应

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
async def _act(self) -> Message:
    logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
    response = await self.rc.todo.run(self.rc.history)
    
    if isinstance(response, (ActionOutput, ActionNode)):
        msg = AIMessage(
            content=response.content,
            instruct_content=response.instruct_content,
            cause_by=self.rc.todo,
            sent_from=self,
        )
    elif isinstance(response, Message):
        msg = response
    else:
        msg = AIMessage(content=response or "", cause_by=self.rc.todo, sent_from=self)
    
    self.rc.memory.add(msg)
    return msg

响应处理:

  1. 动作执行: 调用当前待办动作的run()方法
  2. 响应封装: 根据响应类型创建合适的消息对象
  3. 记忆更新: 将响应添加到角色记忆中
  4. 消息返回: 返回格式化的消息对象

5. 动作执行函数

Action.run() - 动作执行入口

位置: metagpt/actions/action.py:110 职责: 执行具体的动作逻辑

1
2
3
4
5
async def run(self, *args, **kwargs):
    """Run action"""
    if self.node:
        return await self._run_action_node(*args, **kwargs)
    raise NotImplementedError("The run method should be implemented in a subclass.")

Action._run_action_node() - ActionNode执行

位置: metagpt/actions/action.py:103

1
2
3
4
5
6
async def _run_action_node(self, *args, **kwargs):
    """Run action node"""
    msgs = args[0]
    context = "## History Messages\n"
    context += "\n".join([f"{idx}: {i}" for idx, i in enumerate(reversed(msgs))])
    return await self.node.fill(req=context, llm=self.llm)

上下文构建:

  • 将历史消息格式化为上下文
  • 按时间倒序排列,最新消息在前
  • 委托给ActionNode进行结构化处理

6. ActionNode核心函数

ActionNode.fill() - 结构化填充核心

位置: metagpt/actions/action_node.py:597 职责: 实现结构化的LLM输出处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@exp_cache(serializer=ActionNodeSerializer())
async def fill(
    self,
    *,
    req,
    llm,
    schema="json",
    mode="auto",
    strgy="simple",
    images: Optional[Union[str, list[str]]] = None,
    timeout=USE_CONFIG_TIMEOUT,
    exclude=[],
    function_name: str = None,
):
    """Fill the node(s) with mode."""
    self.set_llm(llm)
    self.set_context(req)
    if self.schema:
        schema = self.schema

    if mode == FillMode.CODE_FILL.value:
        result = await self.code_fill(context, function_name, timeout)
        self.instruct_content = self.create_class()(**result)
        return self

    elif mode == FillMode.XML_FILL.value:
        context = self.xml_compile(context=self.context)
        result = await self.xml_fill(context, images=images)
        self.instruct_content = self.create_class()(**result)
        return self

    elif mode == FillMode.SINGLE_FILL.value:
        result = await self.single_fill(context, images=images)
        self.instruct_content = self.create_class()(**result)
        return self

    if strgy == "simple":
        return await self.simple_fill(schema=schema, mode=mode, images=images, timeout=timeout, exclude=exclude)
    elif strgy == "complex":
        # 复杂策略:逐个处理子节点
        tmp = {}
        for _, i in self.children.items():
            if exclude and i.key in exclude:
                continue
            child = await i.simple_fill(schema=schema, mode=mode, images=images, timeout=timeout, exclude=exclude)
            tmp.update(child.instruct_content.model_dump())
        cls = self._create_children_class()
        self.instruct_content = cls(**tmp)
        return self

多模式支持:

  • CODE_FILL: 专门处理代码生成,支持函数名提取
  • XML_FILL: XML格式的结构化输出
  • SINGLE_FILL: 单一字段填充
  • 标准模式: JSON/Markdown结构化输出

策略选择:

  • simple: 一次性处理所有字段
  • complex: 逐个处理子节点,适合复杂嵌套结构

ActionNode.compile() - 提示模板编译

位置: metagpt/actions/action_node.py:382 职责: 将节点定义编译为LLM提示模板

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def compile(self, context, schema="json", mode="children", template=SIMPLE_TEMPLATE, exclude=[]) -> str:
    """
    编译提示模板
    mode: all/root/children
    schema: raw/json/markdown
    """
    if schema == "raw":
        return f"{context}\n\n## Actions\n{LANGUAGE_CONSTRAINT}\n{self.instruction}"

    instruction = self.compile_instruction(schema="markdown", mode=mode, exclude=exclude)
    example = self.compile_example(schema=schema, tag=TAG, mode=mode, exclude=exclude)
    constraints = [LANGUAGE_CONSTRAINT, FORMAT_CONSTRAINT]
    constraint = "\n".join(constraints)

    prompt = template.format(
        context=context,
        example=example,
        instruction=instruction,
        constraint=constraint,
    )
    return prompt

模板组成:

  1. 上下文: 历史消息和当前请求
  2. 指令: 字段定义和类型约束
  3. 示例: 期望的输出格式示例
  4. 约束: 语言和格式约束

7. 记忆管理函数

Memory.add() - 消息添加

位置: metagpt/memory/memory.py:27

1
2
3
4
5
6
7
8
9
def add(self, message: Message):
    """Add a new message to storage, while updating the index"""
    if self.ignore_id:
        message.id = IGNORED_MESSAGE_ID
    if message in self.storage:
        return
    self.storage.append(message)
    if message.cause_by:
        self.index[message.cause_by].append(message)

索引维护:

  • 按动作类型建立倒排索引
  • 避免重复消息
  • 支持ID忽略模式

Memory.get_by_actions() - 按动作检索

位置: metagpt/memory/memory.py:99

1
2
3
4
5
6
7
8
9
def get_by_actions(self, actions: Set) -> list[Message]:
    """Return all messages triggered by specified Actions"""
    rsp = []
    indices = any_to_str_set(actions)
    for action in indices:
        if action not in self.index:
            continue
        rsp += self.index[action]
    return rsp

高效检索:

  • 利用倒排索引快速定位
  • 支持多动作类型查询
  • 自动处理不存在的索引

8. LLM调用函数

BaseLLM.aask() - 异步LLM调用

位置: metagpt/provider/base_llm.py:179

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
async def aask(
    self,
    msg: Union[str, list[dict[str, str]]],
    system_msgs: Optional[list[str]] = None,
    format_msgs: Optional[list[dict[str, str]]] = None,
    images: Optional[Union[str, list[str]]] = None,
    timeout=USE_CONFIG_TIMEOUT,
    stream=None,
) -> str:
    if system_msgs:
        message = self._system_msgs(system_msgs)
    else:
        message = [self._default_system_msg()]
    if not self.use_system_prompt:
        message = []
    if format_msgs:
        message.extend(format_msgs)
    if isinstance(msg, str):
        message.append(self._user_msg(msg, images=images))
    else:
        message.extend(msg)
    if stream is None:
        stream = self.config.stream

    # 图像数据替换为占位符以避免长输出
    masked_message = [self.mask_base64_data(m) for m in message]
    logger.debug(masked_message)

    compressed_message = self.compress_messages(message, compress_type=self.config.compress_type)
    rsp = await self.acompletion_text(compressed_message, stream=stream, timeout=self.get_timeout(timeout))
    return rsp

消息处理流程:

  1. 消息组装: 系统消息 + 格式化消息 + 用户消息
  2. 多模态支持: 处理图像输入
  3. 日志优化: 屏蔽base64数据避免日志过长
  4. 消息压缩: 根据配置压缩长上下文
  5. 异步调用: 支持流式和非流式输出

BaseLLM.compress_messages() - 消息压缩

位置: metagpt/provider/base_llm.py:340

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def compress_messages(
    self,
    messages: list[dict],
    compress_type: CompressType = CompressType.NO_COMPRESS,
    max_token: int = 128000,
    threshold: float = 0.8,
) -> list[dict]:
    """Compress messages to fit within the token limit."""
    if compress_type == CompressType.NO_COMPRESS:
        return messages

    max_token = TOKEN_MAX.get(self.model, max_token)
    keep_token = int(max_token * threshold)
    compressed = []

    # Always keep system messages
    system_msg_val = self._system_msg("")["role"]
    system_msgs = []
    for i, msg in enumerate(messages):
        if msg["role"] == system_msg_val:
            system_msgs.append(msg)
        else:
            user_assistant_msgs = messages[i:]
            break
    
    compressed.extend(system_msgs)
    current_token_count = self.count_tokens(system_msgs)

    if compress_type in [CompressType.POST_CUT_BY_TOKEN, CompressType.POST_CUT_BY_MSG]:
        # 保留最新的消息
        for i, msg in enumerate(reversed(user_assistant_msgs)):
            token_count = self.count_tokens([msg])
            if current_token_count + token_count <= keep_token:
                compressed.insert(len(system_msgs), msg)
                current_token_count += token_count
            else:
                if compress_type == CompressType.POST_CUT_BY_TOKEN:
                    # 截断消息以适应剩余token数量
                    truncated_content = msg["content"][-(keep_token - current_token_count) :]
                    compressed.insert(len(system_msgs), {"role": msg["role"], "content": truncated_content})
                break

    return compressed

压缩策略:

  • POST_CUT: 保留最新消息,删除最旧的
  • PRE_CUT: 保留最旧消息,删除最新的
  • TOKEN级别: 精确到token的截断
  • MSG级别: 以完整消息为单位的截断

总结

MetaGPT关键函数的设计特点:

  1. 分层清晰: 从启动到执行的每一层都有明确的职责边界
  2. 异步优先: 大量使用async/await实现高并发
  3. 错误处理: 完善的异常处理和状态恢复机制
  4. 可扩展性: 通过接口和抽象类支持功能扩展
  5. 工程化: 日志、监控、成本控制等生产级特性

上述函数构成了完整的多智能体协作框架,实现复杂软件开发任务的自动化处理。


创建时间: 2025年09月10日

本文由 tommie blog 原创发布