1)整体架构概览
MetaGPT 是一个基于大语言模型的多智能体协作框架,模拟软件公司的开发流程,通过不同角色的智能体协作完成复杂的软件开发任务。

显示 Mermaid 源码
graph TB
subgraph "用户层"
CLI[命令行接口]
API[API接口]
Examples[示例应用]
end
subgraph "团队管理层"
Team[Team 团队]
Context[Context 上下文]
Config[Config 配置]
end
subgraph "环境层"
Environment[Environment 环境]
MGXEnv[MGX环境]
ExtEnv[扩展环境]
end
subgraph "角色层"
Role[Role 基础角色]
PM[ProductManager]
Arch[Architect]
Eng[Engineer]
TL[TeamLeader]
DA[DataAnalyst]
end
subgraph "动作层"
Action[Action 基础动作]
ActionNode[ActionNode]
WritePRD[写PRD]
WriteCode[写代码]
CodeReview[代码审查]
end
subgraph "基础设施层"
LLM[LLM提供者]
Memory[记忆系统]
Schema[消息模式]
Tools[工具集]
end
CLI --> Team
API --> Team
Examples --> Team
Team --> Environment
Team --> Role
Context --> Config
Environment --> Role
Role --> Action
Action --> ActionNode
Role --> Memory
Action --> LLM
Action --> Schema
LLM --> Tools</pre>
</details>
核心设计理念
- 分层架构:用户层 → 团队层 → 环境层 → 角色层 → 动作层 → 基础设施层
- 消息驱动:通过 Message 在角色间传递信息,实现异步协作
- 角色专业化:每个角色专注特定职责,如产品经理负责需求分析,架构师负责系统设计
- 动作原子化:将复杂任务分解为可复用的原子动作
- 记忆持久化:支持长期记忆和工作记忆,保持上下文连续性
2)启动流程与入口分析
2.1 主入口:software_company.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| def generate_repo(idea, investment=3.0, n_round=5, ...):
"""核心启动逻辑"""
# 1. 配置初始化
config.update_via_cli(project_path, project_name, inc, reqa_file, max_auto_summarize_code)
ctx = Context(config=config)
# 2. 团队组建
company = Team(context=ctx)
company.hire([
TeamLeader(),
ProductManager(),
Architect(),
Engineer2(),
DataAnalyst(),
])
# 3. 投资与运行
company.invest(investment)
asyncio.run(company.run(n_round=n_round, idea=idea))
|
关键路径函数:
generate_repo()
: 主启动函数,协调整个流程Team.__init__()
: 初始化团队环境和上下文Team.hire()
: 添加角色到环境中Team.run()
: 执行多轮协作循环
2.2 启动时序图

显示 Mermaid 源码
sequenceDiagram
participant CLI as 命令行
participant SC as SoftwareCompany
participant Team as Team
participant Env as Environment
participant Role as Role
participant LLM as LLM Provider
CLI->>SC: startup(idea, investment, n_round)
SC->>SC: generate_repo()
SC->>Team: Team(context=ctx)
Team->>Env: Environment(context=ctx)
SC->>Team: hire([TeamLeader, PM, Architect, Engineer])
loop 每个角色
Team->>Env: add_role(role)
Env->>Role: set_env(env)
Role->>LLM: 初始化LLM连接
end
SC->>Team: invest(investment)
SC->>Team: run(n_round, idea)
Team->>Env: publish_message(Message(idea))
loop n_round轮
Team->>Env: run()
loop 每个角色
Env->>Role: run()
Role->>Role: _observe()
Role->>Role: _think()
Role->>Role: _act()
Role->>Env: publish_message(response)
end
end</pre>
</details>
3)核心模块深度解析
3.1 Team(团队管理)
职责:统筹整个多智能体团队的运行,管理角色生命周期和资源分配。
关键路径函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| class Team(BaseModel):
async def run(self, n_round=3, idea="", send_to="", auto_archive=True):
"""运行团队直到目标轮数或资金耗尽"""
if idea:
self.run_project(idea=idea, send_to=send_to)
while n_round > 0:
if self.env.is_idle: # 所有角色都空闲
break
n_round -= 1
self._check_balance() # 检查预算
await self.env.run() # 环境运行一轮
self.env.archive(auto_archive) # 归档项目
return self.env.history
def hire(self, roles: list[Role]):
"""雇佣角色进行协作"""
self.env.add_roles(roles)
def invest(self, investment: float):
"""投资公司,设置预算上限"""
self.investment = investment
self.cost_manager.max_budget = investment
|
核心机制:
- 轮次控制:通过
n_round
控制最大执行轮数 - 预算管理:通过
CostManager
跟踪 LLM 调用成本 - 状态检查:通过
is_idle
判断是否所有角色完成工作 - 项目归档:自动保存项目历史和状态
3.2 Environment(环境系统)
职责:提供角色间通信的环境,管理消息路由和角色生命周期。
关键路径函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| class Environment(ExtEnv):
def publish_message(self, message: Message, peekable: bool = True) -> bool:
"""分发消息给接收者"""
found = False
# 根据 RFC 113 的路由特性计划
for role, addrs in self.member_addrs.items():
if is_send_to(message, addrs):
role.put_message(message) # 放入角色私有消息缓冲区
found = True
if not found:
logger.warning(f"Message no recipients: {message.dump()}")
self.history.add(message) # 调试用历史记录
return True
async def run(self, k=1):
"""处理一次所有角色的运行"""
for _ in range(k):
futures = []
for role in self.roles.values():
if role.is_idle:
continue
future = role.run()
futures.append(future)
if futures:
await asyncio.gather(*futures) # 并发执行所有角色
|
消息路由机制:
- 地址订阅:角色通过
addresses
订阅感兴趣的消息 - 异步分发:支持消息的异步分发和处理
- 历史记录:维护完整的消息历史用于调试和回溯
3.3 Role(角色系统)
职责:定义智能体的行为模式,实现观察-思考-行动循环。
关键路径函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
| class Role(BaseRole, SerializationMixin, ContextMixin, BaseModel):
async def run(self, with_message=None) -> Message | None:
"""观察,思考并根据观察结果行动"""
if with_message:
# 处理外部输入消息
msg = self._process_input_message(with_message)
self.put_message(msg)
if not await self._observe():
# 如果没有新信息,暂停等待
return
rsp = await self.react() # 反应(思考+行动)
# 重置下一个要执行的动作
self.set_todo(None)
# 发送响应消息到环境
self.publish_message(rsp)
return rsp
async def _observe(self) -> int:
"""从消息缓冲区准备新消息进行处理"""
# 从消息缓冲区读取未处理的消息
news = self.rc.msg_buffer.pop_all()
# 存储到自己的记忆中防止重复处理
old_messages = self.rc.memory.get()
# 过滤感兴趣的消息
self.rc.news = [
n for n in news
if (n.cause_by in self.rc.watch or self.name in n.send_to)
and n not in old_messages
]
self.rc.memory.add_batch(self.rc.news)
return len(self.rc.news)
async def _think(self) -> bool:
"""考虑做什么并决定下一步行动"""
if len(self.actions) == 1:
# 如果只有一个动作,则只能执行这个
self._set_state(0)
return True
if self.rc.react_mode == RoleReactMode.BY_ORDER:
# 按顺序执行动作
self._set_state(self.rc.state + 1)
return self.rc.state >= 0 and self.rc.state < len(self.actions)
# 使用 LLM 动态选择动作
prompt = self._get_prefix() + STATE_TEMPLATE.format(
history=self.rc.history,
states="\n".join(self.states),
n_states=len(self.states) - 1,
previous_state=self.rc.state,
)
next_state = await self.llm.aask(prompt)
next_state = extract_state_value_from_output(next_state)
self._set_state(int(next_state))
return True
async def _act(self) -> Message:
"""执行当前待办动作"""
response = await self.rc.todo.run(self.rc.history)
if isinstance(response, (ActionOutput, ActionNode)):
msg = AIMessage(
content=response.content,
instruct_content=response.instruct_content,
cause_by=self.rc.todo,
sent_from=self,
)
else:
msg = AIMessage(content=response or "", cause_by=self.rc.todo, sent_from=self)
self.rc.memory.add(msg)
return msg
|
三种反应模式:
- REACT:标准的思考-行动循环,使用 LLM 动态选择动作
- BY_ORDER:按预定义顺序执行动作
- PLAN_AND_ACT:先制定计划,然后执行动作序列
3.4 Action(动作系统)
职责:封装具体的任务执行逻辑,提供可复用的原子操作。
关键路径函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| class Action(SerializationMixin, ContextMixin, BaseModel):
async def run(self, *args, **kwargs):
"""运行动作"""
if self.node:
return await self._run_action_node(*args, **kwargs)
raise NotImplementedError("The run method should be implemented in a subclass.")
async def _run_action_node(self, *args, **kwargs):
"""运行动作节点"""
msgs = args[0]
context = "## History Messages\n"
context += "\n".join([f"{idx}: {i}" for idx, i in enumerate(reversed(msgs))])
return await self.node.fill(req=context, llm=self.llm)
async def _aask(self, prompt: str, system_msgs: Optional[list[str]] = None) -> str:
"""添加默认前缀的 LLM 调用"""
return await self.llm.aask(prompt, system_msgs)
|
3.5 ActionNode(结构化输出)
职责:提供结构化的 LLM 输出处理,支持复杂的数据验证和格式化。
关键路径函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| class ActionNode:
async def fill(self, *, req, llm, schema="json", mode="auto", **kwargs):
"""填充节点内容"""
self.set_llm(llm)
self.set_context(req)
if schema == "raw":
self.content = await self.llm.aask(
f"{req}\n\n## Actions\n{LANGUAGE_CONSTRAINT}\n{self.instruction}"
)
return self
# 编译提示模板
prompt = self.compile(context=req, schema=schema, mode=mode)
mapping = self.get_mapping(mode)
class_name = f"{self.key}_AN"
# 调用 LLM 并解析结构化输出
content, scontent = await self._aask_v1(
prompt, class_name, mapping, schema=schema
)
self.content = content
self.instruct_content = scontent
return self
def compile(self, context, schema="json", mode="children", template=SIMPLE_TEMPLATE):
"""编译提示模板"""
instruction = self.compile_instruction(schema="markdown", mode=mode)
example = self.compile_example(schema=schema, tag=TAG, mode=mode)
constraints = [LANGUAGE_CONSTRAINT, FORMAT_CONSTRAINT]
constraint = "\n".join(constraints)
return template.format(
context=context,
example=example,
instruction=instruction,
constraint=constraint,
)
|
结构化输出特性:
- 动态模型生成:基于字段定义动态创建 Pydantic 模型
- 多格式支持:支持 JSON、Markdown、XML 等输出格式
- 验证与解析:自动验证输出格式并解析为结构化数据
- 错误重试:支持输出格式错误时的自动重试
3.6 Memory(记忆系统)
职责:管理角色的短期和长期记忆,支持消息检索和历史回溯。
关键路径函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| class Memory(BaseModel):
def add(self, message: Message):
"""添加新消息到存储,同时更新索引"""
if message in self.storage:
return
self.storage.append(message)
if message.cause_by:
self.index[message.cause_by].append(message)
def get_by_actions(self, actions: Set) -> list[Message]:
"""返回由指定动作触发的所有消息"""
rsp = []
indices = any_to_str_set(actions)
for action in indices:
if action not in self.index:
continue
rsp += self.index[action]
return rsp
def find_news(self, observed: list[Message], k=0) -> list[Message]:
"""从最近的 k 条记忆中找到新消息(之前未见过的消息)"""
already_observed = self.get(k)
news = []
for i in observed:
if i in already_observed:
continue
news.append(i)
return news
|
记忆机制:
- 分层存储:支持工作记忆和长期记忆
- 索引优化:按动作类型建立索引,快速检索相关消息
- 增量更新:支持增量添加和批量操作
4)消息流与协作机制
4.1 消息路由时序图

显示 Mermaid 源码
sequenceDiagram
participant Role1 as 角色A
participant Env as Environment
participant Role2 as 角色B
participant Memory as Memory
Role1->>Role1: _act() 执行动作
Role1->>Role1: 生成 AIMessage
Role1->>Env: publish_message(msg)
Env->>Env: 检查消息路由规则
Env->>Role2: put_message(msg)
Env->>Memory: add(msg) 历史记录
Role2->>Role2: _observe() 观察新消息
Role2->>Role2: msg_buffer.pop_all()
Role2->>Role2: 过滤感兴趣的消息
Role2->>Memory: add_batch(news) 更新记忆
Role2->>Role2: _think() 思考下一步
Role2->>Role2: _act() 执行响应动作</pre>
</details>
4.2 协作模式
1. 流水线模式:
1
2
| ProductManager → Architect → Engineer → QaEngineer
(需求) → (设计) → (编码) → (测试)
|
2. 评审模式:
1
2
| Engineer → CodeReview → Engineer
(编码) → (评审) → (修改)
|
3. 团队协作模式:
1
| TeamLeader 统筹 → 各角色并行工作 → 结果汇总
|
5)扩展环境与专业化应用
5.1 MGX环境(多模态交互)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| class MGXEnv(Environment, SerializationMixin):
def publish_message(self, message: Message, user_defined_recipient: str = "", publicer: str = ""):
"""让团队领导接管消息发布"""
message = self.attach_images(message) # 多模态消息支持
tl = self.get_role(TEAMLEADER_NAME) # TeamLeader 名为 Mike
if user_defined_recipient:
# 用户直接与某个角色对话
self.direct_chat_roles.add(role_name)
self._publish_message(message)
elif publicer == tl.profile:
# 团队领导处理的消息可以发布
self._publish_message(message)
else:
# 每个常规消息都通过团队领导
message.send_to.add(tl.name)
self._publish_message(message)
|
特性:
- 多模态支持:自动提取和编码图像
- 直接对话:支持用户与特定角色的直接交互
- 团队协调:TeamLeader 统筹消息流转
5.2 Android环境
支持 Android 应用的自动化测试和操作:
1
2
3
4
5
6
7
8
| class AndroidEnv(ExtEnv):
@mark_as_readable
def get_screenshot(self):
"""获取屏幕截图"""
@mark_as_writeable
def tap(self, x: int, y: int):
"""点击屏幕坐标"""
|
6)关键设计模式与最佳实践
6.1 观察者模式
角色通过 watch
机制订阅感兴趣的动作类型:
1
2
3
4
5
6
| def _watch(self, actions: Iterable[Type[Action]]):
"""观察感兴趣的动作"""
self.rc.watch = {any_to_str(t) for t in actions}
def is_watch(self, caused_by: str):
return caused_by in self.rc.watch
|
6.2 策略模式
支持多种反应策略:
1
2
3
4
| class RoleReactMode(str, Enum):
REACT = "react" # 动态选择动作
BY_ORDER = "by_order" # 按顺序执行
PLAN_AND_ACT = "plan_and_act" # 先计划后执行
|
6.3 工厂模式
LLM 提供者的动态创建:
1
2
3
4
5
6
7
| def create_llm_instance(config: LLMConfig) -> BaseLLM:
"""根据配置创建 LLM 实例"""
if config.api_type == LLMType.OPENAI:
return OpenAILLM(config)
elif config.api_type == LLMType.ANTHROPIC:
return AnthropicLLM(config)
# ...
|
6.4 序列化与持久化
支持团队状态的序列化和恢复:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| class Team(BaseModel):
def serialize(self, stg_path: Path = None):
"""序列化团队状态"""
serialized_data = self.model_dump()
serialized_data["context"] = self.env.context.serialize()
write_json_file(team_info_path, serialized_data)
@classmethod
def deserialize(cls, stg_path: Path, context: Context = None) -> "Team":
"""从序列化数据恢复团队"""
team_info = read_json_file(team_info_path)
ctx = context or Context()
ctx.deserialize(team_info.pop("context", None))
return Team(**team_info, context=ctx)
|
7)性能优化与工程实践
7.1 并发执行
环境支持角色的并发执行:
1
2
3
4
5
6
7
8
9
| async def run(self, k=1):
"""并发处理所有角色的运行"""
futures = []
for role in self.roles.values():
if not role.is_idle:
futures.append(role.run())
if futures:
await asyncio.gather(*futures)
|
7.2 成本控制
集成成本管理器跟踪 LLM 调用成本:
1
2
3
4
5
6
7
| class CostManager:
def update_cost(self, prompt_tokens: int, completion_tokens: int, model: str):
"""更新每次请求的 token 成本"""
def _check_balance(self):
if self.cost_manager.total_cost >= self.cost_manager.max_budget:
raise NoMoneyException(self.cost_manager.total_cost)
|
7.3 重试机制
ActionNode 支持自动重试:
1
2
3
4
5
6
7
| @retry(
wait=wait_random_exponential(min=1, max=20),
stop=stop_after_attempt(6),
after=general_after_log(logger),
)
async def _aask_v1(self, prompt: str, ...):
"""带重试的 LLM 调用"""
|
7.4 消息压缩
支持长上下文的消息压缩:
1
2
3
4
5
6
| def compress_messages(self, messages: list[dict], compress_type: CompressType = CompressType.NO_COMPRESS):
"""压缩消息以适应 token 限制"""
if compress_type == CompressType.POST_CUT_BY_TOKEN:
# 保留最新的消息
elif compress_type == CompressType.PRE_CUT_BY_TOKEN:
# 保留最早的消息
|
8)错误处理与边界行为
8.1 预算控制
- 预算检查:每轮执行前检查剩余预算
- 成本跟踪:实时跟踪 LLM 调用成本
- 异常处理:预算不足时抛出
NoMoneyException
8.2 消息路由失败
- 无接收者警告:消息无接收者时记录警告日志
- 地址验证:验证角色地址的有效性
- 消息重试:支持消息发送失败时的重试机制
8.3 LLM 调用失败
- 连接重试:网络连接失败时自动重试
- 格式验证:输出格式不正确时重新生成
- 降级处理:关键服务不可用时的降级策略
9)实战经验与优化建议
9.1 角色设计最佳实践
- 职责单一:每个角色专注特定领域,避免职责重叠
- 接口标准化:统一的消息格式和动作接口
- 状态管理:合理使用工作记忆和长期记忆
9.2 动作优化策略
- 提示工程:精心设计提示模板提高输出质量
- 结构化输出:使用 ActionNode 确保输出格式一致性
- 缓存机制:缓存常用的 LLM 响应减少调用成本
9.3 环境扩展指南
- API 注册:使用
@mark_as_readable
和 @mark_as_writeable
注册环境 API - 状态同步:确保环境状态与角色状态的一致性
- 错误隔离:环境错误不应影响其他组件
9.4 监控与调试
- 日志分级:使用不同级别的日志记录关键事件
- 消息追踪:完整记录消息流转路径
- 性能监控:监控 LLM 调用延迟和成本
10)流程回顾(完整时序)
- 启动阶段:
generate_repo()
→ 配置初始化 → 团队组建 → 角色雇佣 - 运行阶段:
Team.run()
→ 发布需求消息 → 多轮协作循环 - 协作循环:
Environment.run()
→ 并发执行角色 → 消息路由分发 - 角色执行:
Role.run()
→ 观察消息 → 思考决策 → 执行动作 → 发布响应 - 动作执行:
Action.run()
→ 构建提示 → LLM 调用 → 结构化解析 - 结果归档:项目完成后自动归档历史记录和状态
小结
MetaGPT 采用 分层架构 + 消息驱动 + 角色协作 的设计模式,实现复杂软件开发任务的自动化处理:
- 清晰的职责分离:每个组件都有明确的职责边界
- 灵活的扩展机制:支持新角色、新动作、新环境的轻松扩展
- 完善的错误处理:从预算控制到消息路由的全方位错误处理
- 工程化的实践:并发执行、成本控制、状态持久化等生产级特性
工程实现的关键要素:角色专业化、动作原子化、消息标准化、环境可扩展性。这些设计原则确保了MetaGPT在复杂多智能体协作场景下的稳定运行。
创建时间: 2025年9月10日
本文由 tommie blog 原创发布