1)整体架构概览

MetaGPT 是一个基于大语言模型的多智能体协作框架,模拟软件公司的开发流程,通过不同角色的智能体协作完成复杂的软件开发任务。

Mermaid Chart MetaGPT_源码走读文档-0

核心设计理念

  • 分层架构:用户层 → 团队层 → 环境层 → 角色层 → 动作层 → 基础设施层
  • 消息驱动:通过 Message 在角色间传递信息,实现异步协作
  • 角色专业化:每个角色专注特定职责,如产品经理负责需求分析,架构师负责系统设计
  • 动作原子化:将复杂任务分解为可复用的原子动作
  • 记忆持久化:支持长期记忆和工作记忆,保持上下文连续性

2)启动流程与入口分析

2.1 主入口:software_company.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def generate_repo(idea, investment=3.0, n_round=5, ...):
    """核心启动逻辑"""
    # 1. 配置初始化
    config.update_via_cli(project_path, project_name, inc, reqa_file, max_auto_summarize_code)
    ctx = Context(config=config)
    
    # 2. 团队组建
    company = Team(context=ctx)
    company.hire([
        TeamLeader(),
        ProductManager(),
        Architect(),
        Engineer2(),
        DataAnalyst(),
    ])
    
    # 3. 投资与运行
    company.invest(investment)
    asyncio.run(company.run(n_round=n_round, idea=idea))

关键路径函数

  • generate_repo(): 主启动函数,协调整个流程
  • Team.__init__(): 初始化团队环境和上下文
  • Team.hire(): 添加角色到环境中
  • Team.run(): 执行多轮协作循环

2.2 启动时序图

Mermaid Chart MetaGPT_源码走读文档-1

3)核心模块深度解析

3.1 Team(团队管理)

职责:统筹整个多智能体团队的运行,管理角色生命周期和资源分配。

关键路径函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Team(BaseModel):
    async def run(self, n_round=3, idea="", send_to="", auto_archive=True):
        """运行团队直到目标轮数或资金耗尽"""
        if idea:
            self.run_project(idea=idea, send_to=send_to)
        
        while n_round > 0:
            if self.env.is_idle:  # 所有角色都空闲
                break
            n_round -= 1
            self._check_balance()  # 检查预算
            await self.env.run()   # 环境运行一轮
        
        self.env.archive(auto_archive)  # 归档项目
        return self.env.history
    
    def hire(self, roles: list[Role]):
        """雇佣角色进行协作"""
        self.env.add_roles(roles)
    
    def invest(self, investment: float):
        """投资公司,设置预算上限"""
        self.investment = investment
        self.cost_manager.max_budget = investment

核心机制

  • 轮次控制:通过 n_round 控制最大执行轮数
  • 预算管理:通过 CostManager 跟踪 LLM 调用成本
  • 状态检查:通过 is_idle 判断是否所有角色完成工作
  • 项目归档:自动保存项目历史和状态

3.2 Environment(环境系统)

职责:提供角色间通信的环境,管理消息路由和角色生命周期。

关键路径函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Environment(ExtEnv):
    def publish_message(self, message: Message, peekable: bool = True) -> bool:
        """分发消息给接收者"""
        found = False
        # 根据 RFC 113 的路由特性计划
        for role, addrs in self.member_addrs.items():
            if is_send_to(message, addrs):
                role.put_message(message)  # 放入角色私有消息缓冲区
                found = True
        
        if not found:
            logger.warning(f"Message no recipients: {message.dump()}")
        self.history.add(message)  # 调试用历史记录
        return True
    
    async def run(self, k=1):
        """处理一次所有角色的运行"""
        for _ in range(k):
            futures = []
            for role in self.roles.values():
                if role.is_idle:
                    continue
                future = role.run()
                futures.append(future)
            
            if futures:
                await asyncio.gather(*futures)  # 并发执行所有角色

消息路由机制

  • 地址订阅:角色通过 addresses 订阅感兴趣的消息
  • 异步分发:支持消息的异步分发和处理
  • 历史记录:维护完整的消息历史用于调试和回溯

3.3 Role(角色系统)

职责:定义智能体的行为模式,实现观察-思考-行动循环。

关键路径函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class Role(BaseRole, SerializationMixin, ContextMixin, BaseModel):
    async def run(self, with_message=None) -> Message | None:
        """观察,思考并根据观察结果行动"""
        if with_message:
            # 处理外部输入消息
            msg = self._process_input_message(with_message)
            self.put_message(msg)
        
        if not await self._observe():
            # 如果没有新信息,暂停等待
            return
        
        rsp = await self.react()  # 反应(思考+行动)
        
        # 重置下一个要执行的动作
        self.set_todo(None)
        # 发送响应消息到环境
        self.publish_message(rsp)
        return rsp
    
    async def _observe(self) -> int:
        """从消息缓冲区准备新消息进行处理"""
        # 从消息缓冲区读取未处理的消息
        news = self.rc.msg_buffer.pop_all()
        # 存储到自己的记忆中防止重复处理
        old_messages = self.rc.memory.get()
        # 过滤感兴趣的消息
        self.rc.news = [
            n for n in news 
            if (n.cause_by in self.rc.watch or self.name in n.send_to) 
            and n not in old_messages
        ]
        self.rc.memory.add_batch(self.rc.news)
        return len(self.rc.news)
    
    async def _think(self) -> bool:
        """考虑做什么并决定下一步行动"""
        if len(self.actions) == 1:
            # 如果只有一个动作,则只能执行这个
            self._set_state(0)
            return True
        
        if self.rc.react_mode == RoleReactMode.BY_ORDER:
            # 按顺序执行动作
            self._set_state(self.rc.state + 1)
            return self.rc.state >= 0 and self.rc.state < len(self.actions)
        
        # 使用 LLM 动态选择动作
        prompt = self._get_prefix() + STATE_TEMPLATE.format(
            history=self.rc.history,
            states="\n".join(self.states),
            n_states=len(self.states) - 1,
            previous_state=self.rc.state,
        )
        
        next_state = await self.llm.aask(prompt)
        next_state = extract_state_value_from_output(next_state)
        self._set_state(int(next_state))
        return True
    
    async def _act(self) -> Message:
        """执行当前待办动作"""
        response = await self.rc.todo.run(self.rc.history)
        
        if isinstance(response, (ActionOutput, ActionNode)):
            msg = AIMessage(
                content=response.content,
                instruct_content=response.instruct_content,
                cause_by=self.rc.todo,
                sent_from=self,
            )
        else:
            msg = AIMessage(content=response or "", cause_by=self.rc.todo, sent_from=self)
        
        self.rc.memory.add(msg)
        return msg

三种反应模式

  • REACT:标准的思考-行动循环,使用 LLM 动态选择动作
  • BY_ORDER:按预定义顺序执行动作
  • PLAN_AND_ACT:先制定计划,然后执行动作序列

3.4 Action(动作系统)

职责:封装具体的任务执行逻辑,提供可复用的原子操作。

关键路径函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class Action(SerializationMixin, ContextMixin, BaseModel):
    async def run(self, *args, **kwargs):
        """运行动作"""
        if self.node:
            return await self._run_action_node(*args, **kwargs)
        raise NotImplementedError("The run method should be implemented in a subclass.")
    
    async def _run_action_node(self, *args, **kwargs):
        """运行动作节点"""
        msgs = args[0]
        context = "## History Messages\n"
        context += "\n".join([f"{idx}: {i}" for idx, i in enumerate(reversed(msgs))])
        return await self.node.fill(req=context, llm=self.llm)
    
    async def _aask(self, prompt: str, system_msgs: Optional[list[str]] = None) -> str:
        """添加默认前缀的 LLM 调用"""
        return await self.llm.aask(prompt, system_msgs)

3.5 ActionNode(结构化输出)

职责:提供结构化的 LLM 输出处理,支持复杂的数据验证和格式化。

关键路径函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ActionNode:
    async def fill(self, *, req, llm, schema="json", mode="auto", **kwargs):
        """填充节点内容"""
        self.set_llm(llm)
        self.set_context(req)
        
        if schema == "raw":
            self.content = await self.llm.aask(
                f"{req}\n\n## Actions\n{LANGUAGE_CONSTRAINT}\n{self.instruction}"
            )
            return self
        
        # 编译提示模板
        prompt = self.compile(context=req, schema=schema, mode=mode)
        mapping = self.get_mapping(mode)
        class_name = f"{self.key}_AN"
        
        # 调用 LLM 并解析结构化输出
        content, scontent = await self._aask_v1(
            prompt, class_name, mapping, schema=schema
        )
        
        self.content = content
        self.instruct_content = scontent
        return self
    
    def compile(self, context, schema="json", mode="children", template=SIMPLE_TEMPLATE):
        """编译提示模板"""
        instruction = self.compile_instruction(schema="markdown", mode=mode)
        example = self.compile_example(schema=schema, tag=TAG, mode=mode)
        constraints = [LANGUAGE_CONSTRAINT, FORMAT_CONSTRAINT]
        constraint = "\n".join(constraints)
        
        return template.format(
            context=context,
            example=example,
            instruction=instruction,
            constraint=constraint,
        )

结构化输出特性

  • 动态模型生成:基于字段定义动态创建 Pydantic 模型
  • 多格式支持:支持 JSON、Markdown、XML 等输出格式
  • 验证与解析:自动验证输出格式并解析为结构化数据
  • 错误重试:支持输出格式错误时的自动重试

3.6 Memory(记忆系统)

职责:管理角色的短期和长期记忆,支持消息检索和历史回溯。

关键路径函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Memory(BaseModel):
    def add(self, message: Message):
        """添加新消息到存储,同时更新索引"""
        if message in self.storage:
            return
        self.storage.append(message)
        if message.cause_by:
            self.index[message.cause_by].append(message)
    
    def get_by_actions(self, actions: Set) -> list[Message]:
        """返回由指定动作触发的所有消息"""
        rsp = []
        indices = any_to_str_set(actions)
        for action in indices:
            if action not in self.index:
                continue
            rsp += self.index[action]
        return rsp
    
    def find_news(self, observed: list[Message], k=0) -> list[Message]:
        """从最近的 k 条记忆中找到新消息(之前未见过的消息)"""
        already_observed = self.get(k)
        news = []
        for i in observed:
            if i in already_observed:
                continue
            news.append(i)
        return news

记忆机制

  • 分层存储:支持工作记忆和长期记忆
  • 索引优化:按动作类型建立索引,快速检索相关消息
  • 增量更新:支持增量添加和批量操作

4)消息流与协作机制

4.1 消息路由时序图

Mermaid Chart MetaGPT_源码走读文档-2

4.2 协作模式

1. 流水线模式

1
2
ProductManager → Architect → Engineer → QaEngineer
    (需求)    →   (设计)   →  (编码)  →   (测试)

2. 评审模式

1
2
Engineer → CodeReview → Engineer
 (编码)  →   (评审)   →  (修改)

3. 团队协作模式

1
TeamLeader 统筹 → 各角色并行工作 → 结果汇总

5)扩展环境与专业化应用

5.1 MGX环境(多模态交互)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class MGXEnv(Environment, SerializationMixin):
    def publish_message(self, message: Message, user_defined_recipient: str = "", publicer: str = ""):
        """让团队领导接管消息发布"""
        message = self.attach_images(message)  # 多模态消息支持
        
        tl = self.get_role(TEAMLEADER_NAME)  # TeamLeader 名为 Mike
        
        if user_defined_recipient:
            # 用户直接与某个角色对话
            self.direct_chat_roles.add(role_name)
            self._publish_message(message)
        elif publicer == tl.profile:
            # 团队领导处理的消息可以发布
            self._publish_message(message)
        else:
            # 每个常规消息都通过团队领导
            message.send_to.add(tl.name)
            self._publish_message(message)

特性

  • 多模态支持:自动提取和编码图像
  • 直接对话:支持用户与特定角色的直接交互
  • 团队协调:TeamLeader 统筹消息流转

5.2 Android环境

支持 Android 应用的自动化测试和操作:

1
2
3
4
5
6
7
8
class AndroidEnv(ExtEnv):
    @mark_as_readable
    def get_screenshot(self):
        """获取屏幕截图"""
        
    @mark_as_writeable  
    def tap(self, x: int, y: int):
        """点击屏幕坐标"""

6)关键设计模式与最佳实践

6.1 观察者模式

角色通过 watch 机制订阅感兴趣的动作类型:

1
2
3
4
5
6
def _watch(self, actions: Iterable[Type[Action]]):
    """观察感兴趣的动作"""
    self.rc.watch = {any_to_str(t) for t in actions}

def is_watch(self, caused_by: str):
    return caused_by in self.rc.watch

6.2 策略模式

支持多种反应策略:

1
2
3
4
class RoleReactMode(str, Enum):
    REACT = "react"           # 动态选择动作
    BY_ORDER = "by_order"     # 按顺序执行
    PLAN_AND_ACT = "plan_and_act"  # 先计划后执行

6.3 工厂模式

LLM 提供者的动态创建:

1
2
3
4
5
6
7
def create_llm_instance(config: LLMConfig) -> BaseLLM:
    """根据配置创建 LLM 实例"""
    if config.api_type == LLMType.OPENAI:
        return OpenAILLM(config)
    elif config.api_type == LLMType.ANTHROPIC:
        return AnthropicLLM(config)
    # ...

6.4 序列化与持久化

支持团队状态的序列化和恢复:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class Team(BaseModel):
    def serialize(self, stg_path: Path = None):
        """序列化团队状态"""
        serialized_data = self.model_dump()
        serialized_data["context"] = self.env.context.serialize()
        write_json_file(team_info_path, serialized_data)
    
    @classmethod
    def deserialize(cls, stg_path: Path, context: Context = None) -> "Team":
        """从序列化数据恢复团队"""
        team_info = read_json_file(team_info_path)
        ctx = context or Context()
        ctx.deserialize(team_info.pop("context", None))
        return Team(**team_info, context=ctx)

7)性能优化与工程实践

7.1 并发执行

环境支持角色的并发执行:

1
2
3
4
5
6
7
8
9
async def run(self, k=1):
    """并发处理所有角色的运行"""
    futures = []
    for role in self.roles.values():
        if not role.is_idle:
            futures.append(role.run())
    
    if futures:
        await asyncio.gather(*futures)

7.2 成本控制

集成成本管理器跟踪 LLM 调用成本:

1
2
3
4
5
6
7
class CostManager:
    def update_cost(self, prompt_tokens: int, completion_tokens: int, model: str):
        """更新每次请求的 token 成本"""
        
def _check_balance(self):
    if self.cost_manager.total_cost >= self.cost_manager.max_budget:
        raise NoMoneyException(self.cost_manager.total_cost)

7.3 重试机制

ActionNode 支持自动重试:

1
2
3
4
5
6
7
@retry(
    wait=wait_random_exponential(min=1, max=20),
    stop=stop_after_attempt(6),
    after=general_after_log(logger),
)
async def _aask_v1(self, prompt: str, ...):
    """带重试的 LLM 调用"""

7.4 消息压缩

支持长上下文的消息压缩:

1
2
3
4
5
6
def compress_messages(self, messages: list[dict], compress_type: CompressType = CompressType.NO_COMPRESS):
    """压缩消息以适应 token 限制"""
    if compress_type == CompressType.POST_CUT_BY_TOKEN:
        # 保留最新的消息
    elif compress_type == CompressType.PRE_CUT_BY_TOKEN:
        # 保留最早的消息

8)错误处理与边界行为

8.1 预算控制

  • 预算检查:每轮执行前检查剩余预算
  • 成本跟踪:实时跟踪 LLM 调用成本
  • 异常处理:预算不足时抛出 NoMoneyException

8.2 消息路由失败

  • 无接收者警告:消息无接收者时记录警告日志
  • 地址验证:验证角色地址的有效性
  • 消息重试:支持消息发送失败时的重试机制

8.3 LLM 调用失败

  • 连接重试:网络连接失败时自动重试
  • 格式验证:输出格式不正确时重新生成
  • 降级处理:关键服务不可用时的降级策略

9)实战经验与优化建议

9.1 角色设计最佳实践

  • 职责单一:每个角色专注特定领域,避免职责重叠
  • 接口标准化:统一的消息格式和动作接口
  • 状态管理:合理使用工作记忆和长期记忆

9.2 动作优化策略

  • 提示工程:精心设计提示模板提高输出质量
  • 结构化输出:使用 ActionNode 确保输出格式一致性
  • 缓存机制:缓存常用的 LLM 响应减少调用成本

9.3 环境扩展指南

  • API 注册:使用 @mark_as_readable@mark_as_writeable 注册环境 API
  • 状态同步:确保环境状态与角色状态的一致性
  • 错误隔离:环境错误不应影响其他组件

9.4 监控与调试

  • 日志分级:使用不同级别的日志记录关键事件
  • 消息追踪:完整记录消息流转路径
  • 性能监控:监控 LLM 调用延迟和成本

10)流程回顾(完整时序)

  1. 启动阶段generate_repo() → 配置初始化 → 团队组建 → 角色雇佣
  2. 运行阶段Team.run() → 发布需求消息 → 多轮协作循环
  3. 协作循环Environment.run() → 并发执行角色 → 消息路由分发
  4. 角色执行Role.run() → 观察消息 → 思考决策 → 执行动作 → 发布响应
  5. 动作执行Action.run() → 构建提示 → LLM 调用 → 结构化解析
  6. 结果归档:项目完成后自动归档历史记录和状态

小结

MetaGPT 采用 分层架构 + 消息驱动 + 角色协作 的设计模式,实现复杂软件开发任务的自动化处理:

  • 清晰的职责分离:每个组件都有明确的职责边界
  • 灵活的扩展机制:支持新角色、新动作、新环境的轻松扩展
  • 完善的错误处理:从预算控制到消息路由的全方位错误处理
  • 工程化的实践:并发执行、成本控制、状态持久化等生产级特性

工程实现的关键要素:角色专业化、动作原子化、消息标准化、环境可扩展性。这些设计原则确保了MetaGPT在复杂多智能体协作场景下的稳定运行。


创建时间: 2025年9月10日

本文由 tommie blog 原创发布