核心执行路径函数深度解析
1. 启动入口函数
generate_repo()
- 主启动函数
位置: metagpt/software_company.py:14
职责: 协调整个软件开发流程的启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| def generate_repo(
idea,
investment=3.0,
n_round=5,
code_review=True,
run_tests=False,
implement=True,
project_name="",
inc=False,
project_path="",
reqa_file="",
max_auto_summarize_code=0,
recover_path=None,
):
|
关键步骤:
- 配置初始化:
config.update_via_cli()
- 解析命令行参数并更新配置 - 上下文创建:
Context(config=config)
- 创建全局上下文对象 - 团队组建:
Team(context=ctx)
- 初始化团队实例 - 角色雇佣:
company.hire([...])
- 添加各种专业角色 - 投资设置:
company.invest(investment)
- 设置预算限制 - 异步执行:
asyncio.run(company.run(...))
- 启动主执行循环
设计亮点:
- 支持从序列化状态恢复:
Team.deserialize(stg_path)
- 灵活的角色配置: 可选择性启用不同角色
- 预算控制机制: 防止无限制的LLM调用成本
2. 团队管理函数
Team.run()
- 团队主执行循环
位置: metagpt/team.py:123
职责: 控制整个团队的运行节奏和生命周期
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| @serialize_decorator
async def run(self, n_round=3, idea="", send_to="", auto_archive=True):
"""Run company until target round or no money"""
if idea:
self.run_project(idea=idea, send_to=send_to)
while n_round > 0:
if self.env.is_idle:
logger.debug("All roles are idle.")
break
n_round -= 1
self._check_balance()
await self.env.run()
logger.debug(f"max {n_round=} left.")
self.env.archive(auto_archive)
return self.env.history
|
执行逻辑:
- 需求发布: 如果有新想法,通过
run_project()
发布到环境 - 轮次控制: 最多执行
n_round
轮,防止无限循环 - 空闲检测: 所有角色都空闲时提前结束
- 预算检查: 每轮检查是否超出预算限制
- 环境运行: 委托给环境执行一轮所有角色
- 自动归档: 完成后自动保存项目状态
关键机制:
@serialize_decorator
: 自动序列化团队状态- 预算控制:
_check_balance()
防止成本失控 - 优雅退出: 多种退出条件确保程序正常结束
Team.hire()
- 角色雇佣
位置: metagpt/team.py:83
1
2
3
| def hire(self, roles: list[Role]):
"""Hire roles to cooperate"""
self.env.add_roles(roles)
|
简洁设计: 直接委托给环境的add_roles()
方法,体现了单一职责原则。
3. 环境管理函数
Environment.run()
- 环境执行引擎
位置: metagpt/environment/base_env.py:197
职责: 并发执行所有活跃角色
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| async def run(self, k=1):
"""处理一次所有信息的运行
Process all Role runs at once
"""
for _ in range(k):
futures = []
for role in self.roles.values():
if role.is_idle:
continue
future = role.run()
futures.append(future)
if futures:
await asyncio.gather(*futures)
logger.debug(f"is idle: {self.is_idle}")
|
并发设计:
- 异步收集: 收集所有非空闲角色的执行任务
- 并发执行: 使用
asyncio.gather()
并发执行所有角色 - 性能优化: 跳过空闲角色,避免不必要的等待
Environment.publish_message()
- 消息路由核心
位置: metagpt/environment/base_env.py:175
职责: 实现消息的智能路由分发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| def publish_message(self, message: Message, peekable: bool = True) -> bool:
"""
Distribute the message to the recipients.
"""
logger.debug(f"publish_message: {message.dump()}")
found = False
# 根据 RFC 113 的路由特性计划
for role, addrs in self.member_addrs.items():
if is_send_to(message, addrs):
role.put_message(message)
found = True
if not found:
logger.warning(f"Message no recipients: {message.dump()}")
self.history.add(message) # For debug
return True
|
路由算法:
- 地址匹配: 检查消息的
send_to
是否匹配角色的订阅地址 - 消息投递: 将消息放入匹配角色的私有消息缓冲区
- 异常处理: 记录无接收者的消息警告
- 历史记录: 保存所有消息用于调试和回溯
设计特点:
- 基于RFC 113的路由设计,职责清晰
- 支持一对多广播和点对点通信
- 完整的消息追踪和调试支持
4. 角色执行函数
Role.run()
- 角色主执行入口
位置: metagpt/roles/role.py:530
职责: 实现角色的完整执行周期
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| @role_raise_decorator
async def run(self, with_message=None) -> Message | None:
"""Observe, and think and act based on the results of the observation"""
if with_message:
msg = self._process_input_message(with_message)
self.put_message(msg)
if not await self._observe():
# If there is no new information, suspend and wait
logger.debug(f"{self._setting}: no news. waiting.")
return
rsp = await self.react()
# Reset the next action to be taken.
self.set_todo(None)
# Send the response message to the Environment object
self.publish_message(rsp)
return rsp
|
执行流程:
- 消息预处理: 处理外部输入消息
- 观察阶段:
_observe()
检查新消息 - 反应阶段:
react()
思考并执行动作 - 状态重置: 清理当前任务状态
- 结果发布: 将响应发布到环境
Role._observe()
- 消息观察机制
位置: metagpt/roles/role.py:399
职责: 从消息缓冲区筛选和处理新消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| async def _observe(self) -> int:
"""Prepare new messages for processing from the message buffer and other sources."""
# Read unprocessed messages from the msg buffer.
news = []
if self.recovered and self.latest_observed_msg:
news = self.rc.memory.find_news(observed=[self.latest_observed_msg], k=10)
if not news:
news = self.rc.msg_buffer.pop_all()
# Store the read messages in your own memory to prevent duplicate processing.
old_messages = [] if not self.enable_memory else self.rc.memory.get()
# Filter in messages of interest.
self.rc.news = [
n for n in news
if (n.cause_by in self.rc.watch or self.name in n.send_to)
and n not in old_messages
]
if self.observe_all_msg_from_buffer:
self.rc.memory.add_batch(news)
else:
self.rc.memory.add_batch(self.rc.news)
self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None
return len(self.rc.news)
|
过滤逻辑:
- 消息获取: 从缓冲区获取所有未处理消息
- 重复检测: 与已有记忆对比,避免重复处理
- 兴趣过滤: 只关注订阅的动作类型或直接发送给自己的消息
- 记忆更新: 将新消息添加到记忆系统
- 状态跟踪: 记录最后观察到的消息用于恢复
Role._think()
- 决策思考机制
位置: metagpt/roles/role.py:340
职责: 根据当前状态和历史决定下一步动作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| async def _think(self) -> bool:
"""Consider what to do and decide on the next course of action."""
if len(self.actions) == 1:
# If there is only one action, then only this one can be performed
self._set_state(0)
return True
if self.recovered and self.rc.state >= 0:
self._set_state(self.rc.state) # action to run from recovered state
self.recovered = False
return True
if self.rc.react_mode == RoleReactMode.BY_ORDER:
if self.rc.max_react_loop != len(self.actions):
self.rc.max_react_loop = len(self.actions)
self._set_state(self.rc.state + 1)
return self.rc.state >= 0 and self.rc.state < len(self.actions)
# 使用LLM动态选择动作
prompt = self._get_prefix() + STATE_TEMPLATE.format(
history=self.rc.history,
states="\n".join(self.states),
n_states=len(self.states) - 1,
previous_state=self.rc.state,
)
next_state = await self.llm.aask(prompt)
next_state = extract_state_value_from_output(next_state)
if (not next_state.isdigit() and next_state != "-1") or int(next_state) not in range(-1, len(self.states)):
logger.warning(f"Invalid answer of state, {next_state=}, will be set to -1")
next_state = -1
else:
next_state = int(next_state)
if next_state == -1:
logger.info(f"End actions with {next_state=}")
self._set_state(next_state)
return True
|
决策策略:
- 单动作模式: 只有一个动作时直接执行
- 恢复模式: 从序列化状态恢复时继续之前的动作
- 顺序模式: 按预定义顺序执行动作
- 智能模式: 使用LLM根据历史和当前状态选择最佳动作
状态管理:
- 状态验证确保选择的动作索引有效
- 支持-1状态表示任务完成
- 完善的错误处理和日志记录
Role._act()
- 动作执行机制
位置: metagpt/roles/role.py:381
职责: 执行当前选定的动作并生成响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| async def _act(self) -> Message:
logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
response = await self.rc.todo.run(self.rc.history)
if isinstance(response, (ActionOutput, ActionNode)):
msg = AIMessage(
content=response.content,
instruct_content=response.instruct_content,
cause_by=self.rc.todo,
sent_from=self,
)
elif isinstance(response, Message):
msg = response
else:
msg = AIMessage(content=response or "", cause_by=self.rc.todo, sent_from=self)
self.rc.memory.add(msg)
return msg
|
响应处理:
- 动作执行: 调用当前待办动作的
run()
方法 - 响应封装: 根据响应类型创建合适的消息对象
- 记忆更新: 将响应添加到角色记忆中
- 消息返回: 返回格式化的消息对象
5. 动作执行函数
Action.run()
- 动作执行入口
位置: metagpt/actions/action.py:110
职责: 执行具体的动作逻辑
1
2
3
4
5
| async def run(self, *args, **kwargs):
"""Run action"""
if self.node:
return await self._run_action_node(*args, **kwargs)
raise NotImplementedError("The run method should be implemented in a subclass.")
|
Action._run_action_node()
- ActionNode执行
位置: metagpt/actions/action.py:103
1
2
3
4
5
6
| async def _run_action_node(self, *args, **kwargs):
"""Run action node"""
msgs = args[0]
context = "## History Messages\n"
context += "\n".join([f"{idx}: {i}" for idx, i in enumerate(reversed(msgs))])
return await self.node.fill(req=context, llm=self.llm)
|
上下文构建:
- 将历史消息格式化为上下文
- 按时间倒序排列,最新消息在前
- 委托给ActionNode进行结构化处理
6. ActionNode核心函数
ActionNode.fill()
- 结构化填充核心
位置: metagpt/actions/action_node.py:597
职责: 实现结构化的LLM输出处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
| @exp_cache(serializer=ActionNodeSerializer())
async def fill(
self,
*,
req,
llm,
schema="json",
mode="auto",
strgy="simple",
images: Optional[Union[str, list[str]]] = None,
timeout=USE_CONFIG_TIMEOUT,
exclude=[],
function_name: str = None,
):
"""Fill the node(s) with mode."""
self.set_llm(llm)
self.set_context(req)
if self.schema:
schema = self.schema
if mode == FillMode.CODE_FILL.value:
result = await self.code_fill(context, function_name, timeout)
self.instruct_content = self.create_class()(**result)
return self
elif mode == FillMode.XML_FILL.value:
context = self.xml_compile(context=self.context)
result = await self.xml_fill(context, images=images)
self.instruct_content = self.create_class()(**result)
return self
elif mode == FillMode.SINGLE_FILL.value:
result = await self.single_fill(context, images=images)
self.instruct_content = self.create_class()(**result)
return self
if strgy == "simple":
return await self.simple_fill(schema=schema, mode=mode, images=images, timeout=timeout, exclude=exclude)
elif strgy == "complex":
# 复杂策略:逐个处理子节点
tmp = {}
for _, i in self.children.items():
if exclude and i.key in exclude:
continue
child = await i.simple_fill(schema=schema, mode=mode, images=images, timeout=timeout, exclude=exclude)
tmp.update(child.instruct_content.model_dump())
cls = self._create_children_class()
self.instruct_content = cls(**tmp)
return self
|
多模式支持:
- CODE_FILL: 专门处理代码生成,支持函数名提取
- XML_FILL: XML格式的结构化输出
- SINGLE_FILL: 单一字段填充
- 标准模式: JSON/Markdown结构化输出
策略选择:
- simple: 一次性处理所有字段
- complex: 逐个处理子节点,适合复杂嵌套结构
ActionNode.compile()
- 提示模板编译
位置: metagpt/actions/action_node.py:382
职责: 将节点定义编译为LLM提示模板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| def compile(self, context, schema="json", mode="children", template=SIMPLE_TEMPLATE, exclude=[]) -> str:
"""
编译提示模板
mode: all/root/children
schema: raw/json/markdown
"""
if schema == "raw":
return f"{context}\n\n## Actions\n{LANGUAGE_CONSTRAINT}\n{self.instruction}"
instruction = self.compile_instruction(schema="markdown", mode=mode, exclude=exclude)
example = self.compile_example(schema=schema, tag=TAG, mode=mode, exclude=exclude)
constraints = [LANGUAGE_CONSTRAINT, FORMAT_CONSTRAINT]
constraint = "\n".join(constraints)
prompt = template.format(
context=context,
example=example,
instruction=instruction,
constraint=constraint,
)
return prompt
|
模板组成:
- 上下文: 历史消息和当前请求
- 指令: 字段定义和类型约束
- 示例: 期望的输出格式示例
- 约束: 语言和格式约束
7. 记忆管理函数
Memory.add()
- 消息添加
位置: metagpt/memory/memory.py:27
1
2
3
4
5
6
7
8
9
| def add(self, message: Message):
"""Add a new message to storage, while updating the index"""
if self.ignore_id:
message.id = IGNORED_MESSAGE_ID
if message in self.storage:
return
self.storage.append(message)
if message.cause_by:
self.index[message.cause_by].append(message)
|
索引维护:
- 按动作类型建立倒排索引
- 避免重复消息
- 支持ID忽略模式
Memory.get_by_actions()
- 按动作检索
位置: metagpt/memory/memory.py:99
1
2
3
4
5
6
7
8
9
| def get_by_actions(self, actions: Set) -> list[Message]:
"""Return all messages triggered by specified Actions"""
rsp = []
indices = any_to_str_set(actions)
for action in indices:
if action not in self.index:
continue
rsp += self.index[action]
return rsp
|
高效检索:
- 利用倒排索引快速定位
- 支持多动作类型查询
- 自动处理不存在的索引
8. LLM调用函数
BaseLLM.aask()
- 异步LLM调用
位置: metagpt/provider/base_llm.py:179
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| async def aask(
self,
msg: Union[str, list[dict[str, str]]],
system_msgs: Optional[list[str]] = None,
format_msgs: Optional[list[dict[str, str]]] = None,
images: Optional[Union[str, list[str]]] = None,
timeout=USE_CONFIG_TIMEOUT,
stream=None,
) -> str:
if system_msgs:
message = self._system_msgs(system_msgs)
else:
message = [self._default_system_msg()]
if not self.use_system_prompt:
message = []
if format_msgs:
message.extend(format_msgs)
if isinstance(msg, str):
message.append(self._user_msg(msg, images=images))
else:
message.extend(msg)
if stream is None:
stream = self.config.stream
# 图像数据替换为占位符以避免长输出
masked_message = [self.mask_base64_data(m) for m in message]
logger.debug(masked_message)
compressed_message = self.compress_messages(message, compress_type=self.config.compress_type)
rsp = await self.acompletion_text(compressed_message, stream=stream, timeout=self.get_timeout(timeout))
return rsp
|
消息处理流程:
- 消息组装: 系统消息 + 格式化消息 + 用户消息
- 多模态支持: 处理图像输入
- 日志优化: 屏蔽base64数据避免日志过长
- 消息压缩: 根据配置压缩长上下文
- 异步调用: 支持流式和非流式输出
BaseLLM.compress_messages()
- 消息压缩
位置: metagpt/provider/base_llm.py:340
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| def compress_messages(
self,
messages: list[dict],
compress_type: CompressType = CompressType.NO_COMPRESS,
max_token: int = 128000,
threshold: float = 0.8,
) -> list[dict]:
"""Compress messages to fit within the token limit."""
if compress_type == CompressType.NO_COMPRESS:
return messages
max_token = TOKEN_MAX.get(self.model, max_token)
keep_token = int(max_token * threshold)
compressed = []
# Always keep system messages
system_msg_val = self._system_msg("")["role"]
system_msgs = []
for i, msg in enumerate(messages):
if msg["role"] == system_msg_val:
system_msgs.append(msg)
else:
user_assistant_msgs = messages[i:]
break
compressed.extend(system_msgs)
current_token_count = self.count_tokens(system_msgs)
if compress_type in [CompressType.POST_CUT_BY_TOKEN, CompressType.POST_CUT_BY_MSG]:
# 保留最新的消息
for i, msg in enumerate(reversed(user_assistant_msgs)):
token_count = self.count_tokens([msg])
if current_token_count + token_count <= keep_token:
compressed.insert(len(system_msgs), msg)
current_token_count += token_count
else:
if compress_type == CompressType.POST_CUT_BY_TOKEN:
# 截断消息以适应剩余token数量
truncated_content = msg["content"][-(keep_token - current_token_count) :]
compressed.insert(len(system_msgs), {"role": msg["role"], "content": truncated_content})
break
return compressed
|
压缩策略:
- POST_CUT: 保留最新消息,删除最旧的
- PRE_CUT: 保留最旧消息,删除最新的
- TOKEN级别: 精确到token的截断
- MSG级别: 以完整消息为单位的截断
总结
MetaGPT关键函数的设计特点:
- 分层清晰: 从启动到执行的每一层都有明确的职责边界
- 异步优先: 大量使用async/await实现高并发
- 错误处理: 完善的异常处理和状态恢复机制
- 可扩展性: 通过接口和抽象类支持功能扩展
- 工程化: 日志、监控、成本控制等生产级特性
上述函数构成了完整的多智能体协作框架,实现复杂软件开发任务的自动化处理。
创建时间: 2025年09月10日
本文由 tommie blog 原创发布