概述
本文档专门分析AutoGen系统中各组件间的交互流程,通过详细的时序图展示消息传递、代理生命周期、团队协作等关键流程的实现机制。
1. 代理生命周期管理时序
1.1 代理创建和注册流程 (基于实际代码实现)
sequenceDiagram
participant App as 应用程序
participant RT as AgentRuntime
participant AIC as AgentInstantiationContext
participant A as Agent实例
participant SM as SubscriptionManager
participant SR as SerializationRegistry
Note over App,SR: 代理工厂注册流程 (BaseAgent.register)
App->>RT: register_factory(type="ChatAgent", factory_func)
activate RT
RT->>RT: 验证代理类型和工厂函数
RT->>RT: 存储到 _agent_factories[type]
Note over RT: 处理类级订阅 (skip_class_subscriptions=False)
RT->>RT: 创建 SubscriptionInstantiationContext
RT->>A: 调用 cls._unbound_subscriptions()
A->>RT: 返回订阅列表
loop 为每个订阅
RT->>SM: add_subscription(subscription)
SM->>SM: 存储订阅规则
end
Note over RT: 添加直接消息订阅
RT->>SM: add_subscription(TypePrefixSubscription(type + ":"))
Note over RT: 注册消息序列化器
RT->>A: 调用 cls._handles_types()
A->>RT: 返回处理的消息类型
loop 为每个消息类型
RT->>SR: add_message_serializer(serializer)
end
RT->>App: 返回 AgentType
deactivate RT
Note over App,SR: 代理实例按需创建 (EnsureAgentAsync)
App->>RT: send_message(message, AgentId("ChatAgent", "user123"))
activate RT
RT->>RT: 检查 agentInstances.TryGetValue(agentId)
alt 代理不存在
RT->>RT: 从 agentFactories 获取工厂函数
Note over RT,AIC: 创建实例化上下文
RT->>AIC: populate_context(agentId, runtime)
activate AIC
RT->>RT: 调用 factory_func()
RT->>A: 创建代理实例
activate A
A->>AIC: current_runtime() / current_agent_id()
AIC->>A: 返回运行时和代理ID
A->>A: 绑定 _runtime 和 _id
deactivate AIC
A->>RT: 返回代理实例
Note over RT: 即时注册消息类型 (.NET特性)
RT->>A: RegisterHandledMessageTypes(serializationRegistry)
A->>SR: 注册处理的消息类型
RT->>RT: agentInstances.Add(agentId, agent)
deactivate A
else 代理已存在
RT->>RT: 从缓存返回代理实例
end
RT->>A: OnMessageAsync(message, messageContext)
activate A
A->>A: 路由到相应的消息处理器
A->>RT: 返回处理结果
deactivate A
RT->>App: 返回最终响应
deactivate RT
1.2 代理状态管理时序
sequenceDiagram
participant A as Agent
participant RT as Runtime
participant SM as StateManager
participant PS as PersistentStore
participant MON as Monitor
Note over A,MON: 代理状态保存流程
A->>RT: 触发状态保存
RT->>SM: save_agent_state(agent_id)
activate SM
SM->>A: 获取当前状态
A->>SM: 返回状态数据
SM->>SM: 序列化状态
SM->>PS: 持久化状态数据
activate PS
PS->>PS: 写入存储
PS->>SM: 保存成功确认
deactivate PS
SM->>MON: 记录状态保存事件
MON->>MON: 更新监控指标
SM->>RT: 状态保存完成
deactivate SM
RT->>A: 保存成功通知
Note over A,MON: 代理状态恢复流程
RT->>SM: load_agent_state(agent_id, state)
activate SM
SM->>PS: 读取状态数据
activate PS
PS->>SM: 返回状态数据
deactivate PS
SM->>SM: 反序列化状态
SM->>A: 加载状态到代理
activate A
A->>A: 恢复内部状态
A->>SM: 状态恢复完成
deactivate A
SM->>MON: 记录状态恢复事件
SM->>RT: 状态恢复完成
deactivate SM
2. 消息传递与路由时序
2.1 点对点消息传递
sequenceDiagram
participant S as Sender代理
participant SRT as Sender运行时
participant MQ as 消息队列
participant TRT as Target运行时
participant T as Target代理
participant IH as 干预处理器
Note over S,IH: 点对点消息发送流程
S->>SRT: send_message(message, recipient)
activate SRT
SRT->>IH: on_send_message(message, sender, recipient)
activate IH
IH->>IH: 检查和转换消息
IH->>SRT: 返回处理后的消息
deactivate IH
SRT->>SRT: 创建SendMessageEnvelope
SRT->>MQ: 加入消息队列
activate MQ
MQ->>MQ: 队列处理消息
MQ->>TRT: 投递消息
activate TRT
TRT->>TRT: 解析目标代理ID
TRT->>T: 获取或创建目标代理
activate T
TRT->>T: on_message(message, context)
T->>T: 处理消息逻辑
T->>TRT: 返回处理结果
deactivate T
TRT->>MQ: 返回响应
deactivate TRT
MQ->>SRT: 传递响应
deactivate MQ
SRT->>S: 返回最终结果
deactivate SRT
2.2 发布订阅消息传递 (基于InProcessRuntime.PublishMessageServicer实现)
sequenceDiagram
participant P as Publisher
participant RT as AgentRuntime
participant PE as PublishMessageEnvelope
participant SM as SubscriptionManager
participant A1 as Agent1
participant A2 as Agent2
participant EH as ExceptionHandler
Note over P,EH: 发布订阅消息完整流程
P->>RT: publish_message(message, TopicId("chat.message", "user123"))
activate RT
RT->>PE: 创建MessageEnvelope.ForPublish(topic, PublishMessageServicer)
RT->>RT: messageDeliveryQueue.Enqueue(delivery)
Note over RT,SM: PublishMessageServicer 核心逻辑
RT->>RT: 验证 envelope.Topic.HasValue
RT->>SM: 遍历 subscriptions.Values
activate SM
loop 对每个订阅
SM->>SM: subscription.Matches(topic)
alt 订阅匹配
SM->>SM: subscription.MapToAgent(topic)
Note over SM: 生成 AgentId(agentType, topicSource)
SM->>RT: 返回目标AgentId
RT->>RT: 检查 DeliverToSelf 和发送方
alt 需要投递
RT->>RT: 创建 MessageContext(messageId, cancellationToken)
RT->>RT: 设置 Sender, Topic, IsRpc=false
RT->>A1: EnsureAgentAsync(agentId)
activate A1
RT->>A1: OnMessageAsync(envelope.Message, messageContext)
A1->>A1: 处理发布的消息
A1->>RT: 处理完成
deactivate A1
else 跳过投递
Note over RT: 自投递或其他过滤条件
end
else 订阅不匹配
Note over SM: 跳过此订阅
end
end
deactivate SM
Note over RT,EH: 异常聚合处理
alt 有异常发生
RT->>EH: 收集所有处理异常
EH->>EH: 创建 AggregateException
EH->>RT: 抛出聚合异常
RT->>P: 传播异常
else 全部成功
RT->>P: 发布完成
end
deactivate RT
订阅匹配算法详解
基于实际的 TypeSubscription.Matches
和 MapToAgent
实现:
flowchart TD
A[收到TopicId] --> B{TypeSubscription匹配}
B -->|topic.Type == _topicType| C[精确匹配成功]
B -->|不匹配| D[跳过此订阅]
C --> E[调用MapToAgent]
E --> F[创建AgentId]
F --> G[return AgentId(_agentType, topic.Source)]
H[TypePrefixSubscription匹配] --> I{topic.Type.StartsWith(_topicTypePrefix)}
I -->|匹配| J[前缀匹配成功]
I -->|不匹配| K[跳过此订阅]
J --> L[调用MapToAgent]
L --> M[return AgentId(_agentType, topic.Source)]
style C fill:#e1f5fe
style J fill:#f3e5f5
style G fill:#e8f5e8
style M fill:#e8f5e8
2.3 gRPC分布式消息传递
sequenceDiagram
participant C as Client代理
participant CR as Client运行时
participant GMR as gRPC消息路由器
participant GW as Gateway服务
participant TW as Target Worker
participant TA as Target代理
Note over C,TA: gRPC分布式消息传递流程
C->>CR: send_message(message, remote_agent_id)
activate CR
CR->>GMR: 发起gRPC调用
activate GMR
GMR->>GMR: 序列化消息为RpcRequest
GMR->>GW: gRPC调用: OpenChannel.send(RpcRequest)
activate GW
GW->>GW: 查找目标Worker
GW->>TW: 转发RpcRequest
activate TW
TW->>TW: 反序列化消息
TW->>TA: 获取/创建目标代理
activate TA
TW->>TA: on_message_async(message, context)
TA->>TA: 处理业务逻辑
TA->>TW: 返回处理结果
deactivate TA
TW->>TW: 序列化响应为RpcResponse
TW->>GW: 返回RpcResponse
deactivate TW
GW->>GMR: gRPC响应: RpcResponse
deactivate GW
GMR->>GMR: 反序列化响应
GMR->>CR: 返回处理结果
deactivate GMR
CR->>C: 返回最终结果
deactivate CR
3. 团队协作交互流程
3.1 GroupChat团队协作时序 (基于ChatAgentContainer实现)
sequenceDiagram
participant U as 用户
participant GC as GroupChatBase
participant RL as RuntimeLayer
participant RT as InProcessRuntime
participant OS as OutputSink
participant MGR as GroupChatManager
participant C1 as ChatAgentContainer1
participant C2 as ChatAgentContainer2
participant A1 as ChatAgent1
participant A2 as ChatAgent2
Note over U,A2: GroupChat完整初始化和执行流程
U->>GC: run_stream(task="协作创作文章")
activate GC
Note over GC,RT: RuntimeLayer初始化 (CreateRuntime)
GC->>RL: 检查InitOnceTask是否为null
alt 首次初始化
RL->>RT: 创建InProcessRuntime()
activate RT
Note over RL: 注册所有参与者
loop 对每个参与者配置
RL->>RT: RegisterChatAgentAsync(config)
RT->>RT: 注册代理工厂和订阅
end
Note over RL: 注册群聊管理器
RL->>RT: RegisterGroupChatManagerAsync(options, teamId, CreateChatManager)
RT->>MGR: 创建GroupChatManager实例
Note over RL: 注册输出收集器
RL->>OS: 创建OutputSink()
RL->>RT: RegisterOutputCollectorAsync(outputSink, outputTopicType)
deactivate RT
end
Note over GC,MGR: 启动团队协作
GC->>RT: 启动运行时
GC->>MGR: 发送初始任务消息
activate MGR
Note over MGR: 群聊管理器协调逻辑
MGR->>C1: publish_message(GroupChatRequestPublish)
MGR->>C2: publish_message(GroupChatRequestPublish)
Note over C1,A1: ChatAgentContainer.handle_request处理
C1->>C1: 接收GroupChatRequestPublish事件
alt agent是ChatAgent
C1->>A1: 检查self._agent类型
C1->>A1: on_messages_stream(self._message_buffer, cancellation_token)
activate A1
loop 流式处理
A1->>A1: 处理消息并生成内容
A1->>C1: yield中间消息或事件
C1->>C1: _log_message(msg)
end
A1->>C1: yield Response(chat_message=生成的内容)
deactivate A1
C1->>C1: _message_buffer.clear()
C1->>MGR: publish_message(GroupChatAgentResponse(response, name))
else agent是Team
C1->>A1: 构建task_content from _message_buffer
C1->>A1: team.run_stream(task=task_content)
C1->>MGR: publish_message(GroupChatTeamResponse)
end
Note over C2,A2: 同时处理参与者2
C2->>A2: 类似的处理流程
A2->>C2: 生成响应
C2->>MGR: publish_message(GroupChatAgentResponse)
Note over MGR: 收集所有响应并决定下一步
MGR->>MGR: 检查终止条件
alt 未达到终止条件
MGR->>C1: 发送下一轮GroupChatRequestPublish
MGR->>C2: 发送下一轮GroupChatRequestPublish
Note over MGR: 继续协作循环
else 达到终止条件
MGR->>OS: 发送最终结果到OutputSink
MGR->>GC: 协作完成
deactivate MGR
end
GC->>U: 流式返回TaskResult
deactivate GC
代理容器消息缓冲机制
基于 ChatAgentContainer._buffer_message
实现:
sequenceDiagram
participant C as ChatAgentContainer
participant MB as MessageBuffer
participant MF as MessageFactory
participant A as WrappedAgent
Note over C,A: 消息缓冲和处理机制
C->>C: 接收GroupChatAgentResponse事件
alt message.agent_name != self._agent.name
C->>MF: create_from_data(message.response)
activate MF
MF->>MF: 根据type字段创建消息实例
MF->>C: 返回BaseChatMessage实例
deactivate MF
C->>MB: _message_buffer.append(chat_message)
Note over C: 消息已缓冲,等待下次处理
else 自己的消息
Note over C: 跳过自己发送的消息
end
Note over C,A: 处理请求时使用缓冲消息
C->>A: on_messages_stream(self._message_buffer.copy(), cancellation_token)
A->>A: 基于历史消息生成响应
A->>C: 返回Response
C->>MB: _message_buffer.clear()
Note over MB: 清空缓冲区,准备下轮对话
3.2 Swarm代理切换时序
sequenceDiagram
participant U as 用户
participant SWARM as Swarm团队
participant A1 as 路由代理
participant A2 as 专家代理1
participant A3 as 专家代理2
participant HM as HandoffManager
Note over U,HM: Swarm代理动态切换流程
U->>SWARM: run(task="复杂技术问题")
activate SWARM
SWARM->>A1: 初始路由代理处理
activate A1
A1->>A1: 分析任务复杂度
A1->>A1: 识别需要专家介入
A1->>SWARM: HandoffMessage(target="技术专家1")
deactivate A1
SWARM->>HM: 处理代理切换
activate HM
HM->>HM: 验证切换合法性
HM->>A2: 激活技术专家1
activate A2
HM->>A2: 传递上下文和任务
deactivate HM
A2->>A2: 技术问题
A2->>A2: 发现需要更专业的专家
A2->>SWARM: HandoffMessage(target="技术专家2", context="详细分析结果")
deactivate A2
SWARM->>HM: 处理二次切换
activate HM
HM->>A3: 激活技术专家2
activate A3
HM->>A3: 传递完整上下文
deactivate HM
A3->>A3: 提供最终技术解决方案
A3->>SWARM: 最终响应 + TERMINATE
deactivate A3
SWARM->>U: 返回解决方案
deactivate SWARM
4. 工具调用与执行时序
4.1 工具调用完整流程 (基于AssistantAgent._execute_tool_call实现)
sequenceDiagram
participant A as AssistantAgent
participant MC as ModelClient
participant WB as Workbench
participant HT as HandoffTools
participant T as Tool实例
participant SQ as StreamQueue
participant CT as CancellationToken
Note over A,CT: AssistantAgent工具调用完整流程
A->>MC: create(messages, tools=[tool1, tool2], cancellation_token)
activate MC
MC->>MC: LLM推理并识别工具调用需求
MC->>A: 返回ChatCompletionResponse(tool_calls=[call1, call2])
deactivate MC
A->>A: 解析tool_calls列表
par 并发执行工具调用
A->>A: _execute_tool_call(tool_call1, workbench, handoff_tools, agent_name, cancellation_token, stream)
activate A
Note over A: 步骤1: 解析工具调用参数
A->>A: json.loads(tool_call.arguments)
alt JSON解析成功
Note over A: 步骤2: 检查是否为HandoffTool
A->>HT: 遍历handoff_tools检查tool_call.name
alt 是Handoff工具
A->>HT: handoff_tool.run_json(arguments, cancellation_token, call_id)
activate HT
HT->>HT: 执行切换逻辑
HT->>A: 返回HandoffResult
deactivate HT
A->>A: 创建FunctionExecutionResult(handoff_result)
else 是普通工具
Note over A: 步骤3: 使用Workbench执行工具
A->>WB: 查找匹配的工具
activate WB
WB->>T: 找到对应工具实例
A->>T: run_json(arguments, cancellation_token, call_id)
activate T
alt 工具执行成功
T->>T: 执行工具逻辑 (同步或异步)
T->>A: 返回工具执行结果
else 工具执行失败
T->>T: 捕获异常
T->>A: 返回错误结果
end
deactivate T
deactivate WB
A->>A: 包装为FunctionExecutionResult
end
else JSON解析失败
A->>A: 创建错误FunctionExecutionResult
Note over A: content=f"Error: {json_decode_error}"
end
A->>SQ: 可选的流式事件发送
A->>A: 返回(tool_call, FunctionExecutionResult)
deactivate A
and
Note over A: 同时执行tool_call2...
A->>A: _execute_tool_call(tool_call2, ...)
end
Note over A: 步骤4: 收集所有工具执行结果
A->>A: executed_calls_and_results = [(call1,result1), (call2,result2)]
Note over A: 步骤5: 根据reflect_on_tool_use决策
alt reflect_on_tool_use = True
A->>A: 将工具结果添加到model_context
A->>MC: create(messages + tool_results)
activate MC
MC->>MC: 基于工具结果生成反思响应
MC->>A: 返回最终TextMessage/StructuredMessage
deactivate MC
else reflect_on_tool_use = False
A->>A: _summarize_tool_use(executed_calls_and_results)
A->>A: 使用tool_call_summary_format格式化
A->>A: 创建ToolCallSummaryMessage
end
A->>A: 构造Response(chat_message, inner_messages)
工具执行状态机
stateDiagram-v2
[*] --> 解析参数 : 开始工具调用
解析参数 --> 参数解析失败 : JSON解析错误
解析参数 --> 检查工具类型 : 参数解析成功
检查工具类型 --> Handoff工具处理 : tool_call.name in handoff_tools
检查工具类型 --> 普通工具处理 : 普通工具调用
Handoff工具处理 --> 执行切换逻辑 : run_json()
执行切换逻辑 --> 工具执行成功 : 切换成功
执行切换逻辑 --> 工具执行失败 : 切换失败
普通工具处理 --> 查找工具实例 : 在workbench中查找
查找工具实例 --> 工具不存在 : 未找到工具
查找工具实例 --> 执行工具 : 找到工具实例
执行工具 --> 工具执行成功 : 正常执行完成
执行工具 --> 工具执行失败 : 执行异常
参数解析失败 --> 返回错误结果
工具不存在 --> 返回错误结果
工具执行失败 --> 返回错误结果
工具执行成功 --> 返回成功结果
返回错误结果 --> [*] : FunctionExecutionResult(is_error=True)
返回成功结果 --> [*] : FunctionExecutionResult(is_error=False)
4.2 工具执行错误处理
sequenceDiagram
participant A as Agent
participant EX as ExecutionEngine
participant T as Tool
participant EH as ErrorHandler
participant MON as Monitor
Note over A,MON: 工具执行错误处理流程
A->>EX: execute_tool_with_retry(tool_call)
activate EX
loop 重试循环 (最多3次)
EX->>T: 执行工具调用
activate T
alt 执行成功
T->>EX: 返回执行结果
EX->>A: 成功结果
break
else 执行失败
T->>EX: 抛出异常
deactivate T
EX->>EH: handle_tool_error(exception)
activate EH
EH->>EH: 分析错误类型
alt 可重试错误
EH->>EX: 返回重试策略
EH->>MON: 记录重试事件
EX->>EX: 等待重试延迟
Note over EX: 指数退避延迟
else 不可重试错误
EH->>EX: 返回终止信号
EH->>MON: 记录严重错误
EX->>A: 返回错误结果
break
end
deactivate EH
end
end
EX->>MON: 记录最终执行结果
deactivate EX
5. 分布式系统交互时序
5.1 Worker注册和发现
sequenceDiagram
participant W as Worker进程
participant GW as Gateway
participant REG as Registry
participant LB as LoadBalancer
participant HM as HealthMonitor
Note over W,HM: Worker注册和服务发现流程
W->>GW: 建立gRPC连接
activate GW
W->>GW: RegisterAgentType("ChatAgent")
GW->>REG: 注册代理类型
activate REG
REG->>REG: 更新代理类型映射
REG->>GW: 注册成功
deactivate REG
W->>GW: AddSubscription(subscription)
GW->>REG: 添加订阅规则
activate REG
REG->>REG: 更新订阅映射
REG->>LB: 通知负载均衡器
activate LB
LB->>LB: 更新路由表
LB->>REG: 更新完成
deactivate LB
REG->>GW: 订阅添加成功
deactivate REG
GW->>HM: 注册Worker健康检查
activate HM
HM->>HM: 启动健康监控
HM->>W: 发送心跳检查
W->>HM: 返回健康状态
deactivate HM
GW->>W: Worker注册完成
deactivate GW
Note over W,HM: 持续健康监控
loop 健康检查循环
HM->>W: 定期健康检查
activate HM
alt Worker健康
W->>HM: 返回正常状态
HM->>REG: 更新Worker状态
else Worker异常
W-->>HM: 超时或错误响应
HM->>REG: 标记Worker不可用
HM->>LB: 从负载均衡中移除
end
deactivate HM
end
5.2 容错和故障恢复
sequenceDiagram
participant C as Client
participant GW as Gateway
participant W1 as Worker1(故障)
participant W2 as Worker2(正常)
participant FM as FailureManager
participant REG as Registry
Note over C,REG: 故障检测和恢复流程
C->>GW: send_message(message, target_agent)
activate GW
GW->>REG: 查找目标Worker
activate REG
REG->>GW: 返回Worker1
deactivate REG
GW->>W1: 转发消息
activate GW
W1-->>GW: 连接超时/异常
GW->>FM: 报告Worker故障
activate FM
FM->>FM: 记录故障事件
FM->>REG: 标记Worker1为不可用
activate REG
REG->>REG: 更新Worker状态
REG->>FM: 查找备用Worker
REG->>FM: 返回Worker2
deactivate REG
FM->>GW: 使用备用Worker2
deactivate FM
GW->>W2: 转发消息到备用Worker
activate W2
W2->>W2: 处理消息
W2->>GW: 返回处理结果
deactivate W2
GW->>C: 返回处理结果
deactivate GW
Note over FM,REG: 故障恢复流程
FM->>W1: 尝试重新连接
alt Worker1恢复
W1->>FM: 连接成功
FM->>REG: 恢复Worker1状态
REG->>REG: 重新启用Worker1
else Worker1持续故障
FM->>FM: 启动新Worker实例
FM->>REG: 注册新Worker
end
6. 状态同步与一致性
6.1 分布式状态同步
sequenceDiagram
participant A1 as Agent1@Worker1
participant A2 as Agent1@Worker2
participant SM as StateManager
participant SS as StateStore
participant SC as SyncCoordinator
Note over A1,SC: 分布式状态同步流程
A1->>A1: 状态发生变更
A1->>SM: notify_state_change(delta)
activate SM
SM->>SC: 请求状态同步
activate SC
SC->>SC: 生成状态版本号
SC->>SS: 保存状态快照
activate SS
SS->>SC: 保存成功
deactivate SS
SC->>A2: 发送状态同步请求
activate A2
A2->>A2: 检查状态版本
alt 需要同步
A2->>SC: 请求状态增量
SC->>SS: 获取状态差异
activate SS
SS->>SC: 返回状态增量
deactivate SS
SC->>A2: 发送状态增量
A2->>A2: 应用状态变更
A2->>SC: 同步完成确认
else 状态已最新
A2->>SC: 无需同步
end
deactivate A2
SC->>SM: 同步完成
deactivate SC
SM->>A1: 状态同步成功
deactivate SM
6.2 事务性消息处理
sequenceDiagram
participant C as Coordinator
participant A1 as Agent1
participant A2 as Agent2
participant TM as TransactionManager
participant LOG as TransactionLog
Note over C,LOG: 分布式事务消息处理
C->>TM: begin_transaction(tx_id)
activate TM
TM->>LOG: 记录事务开始
TM->>C: 事务已启动
C->>A1: send_transactional_message(msg1, tx_id)
activate A1
A1->>TM: prepare_transaction(tx_id)
TM->>A1: 准备就绪
A1->>A1: 执行业务逻辑
A1->>TM: vote_commit(tx_id)
A1->>C: 返回处理结果
deactivate A1
C->>A2: send_transactional_message(msg2, tx_id)
activate A2
A2->>TM: prepare_transaction(tx_id)
TM->>A2: 准备就绪
A2->>A2: 执行业务逻辑
A2->>TM: vote_commit(tx_id)
A2->>C: 返回处理结果
deactivate A2
C->>TM: commit_transaction(tx_id)
alt 所有参与者投票提交
TM->>LOG: 记录提交决策
TM->>A1: commit_confirmed(tx_id)
TM->>A2: commit_confirmed(tx_id)
par 并行提交
A1->>A1: 提交本地变更
A1->>TM: 提交完成
and
A2->>A2: 提交本地变更
A2->>TM: 提交完成
end
TM->>LOG: 记录事务完成
TM->>C: 事务提交成功
else 有参与者投票回滚
TM->>LOG: 记录回滚决策
TM->>A1: rollback(tx_id)
TM->>A2: rollback(tx_id)
par 并行回滚
A1->>A1: 回滚本地变更
A1->>TM: 回滚完成
and
A2->>A2: 回滚本地变更
A2->>TM: 回滚完成
end
TM->>LOG: 记录事务回滚
TM->>C: 事务已回滚
end
deactivate TM
7. 代码执行代理时序
7.1 CodeExecutorAgent执行流程 (基于实际实现)
sequenceDiagram
participant U as 用户
participant CEA as CodeExecutorAgent
participant MC as ModelClient
participant MCT as ModelContext
participant CB as CodeBlockExtractor
participant CE as CodeExecutor
participant AR as ApprovalRequest
Note over U,AR: CodeExecutorAgent完整执行流程
U->>CEA: run_stream(task="编写并执行Python代码计算斐波那契数列")
activate CEA
CEA->>MCT: 添加用户任务到model_context
MCT->>MCT: add_message(UserMessage(task))
Note over CEA: 重试循环 (max_retries_on_error)
loop 最多重试次数
CEA->>MCT: get_messages() 获取对话历史
MCT->>CEA: 返回LLMMessage列表
Note over CEA: 步骤1-2: 模型推理
CEA->>MC: create(messages, cancellation_token)
activate MC
MC->>MC: LLM推理生成代码
MC->>CEA: 返回ChatCompletionResponse
deactivate MC
CEA->>CEA: yield CodeGenerationEvent(content, retry_attempt)
Note over CEA: 步骤3-4: 提取代码块
CEA->>CB: _extract_markdown_code_blocks(model_result.content)
activate CB
CB->>CB: 正则表达式提取```...```代码块
CB->>CEA: 返回code_blocks列表
deactivate CB
alt 无代码块
CEA->>CEA: yield Response(TextMessage(model_result.content))
Note over CEA: 直接返回文本响应,结束流程
break
end
Note over CEA: 步骤5: 代码执行前审批 (如果启用)
alt approval_func配置
CEA->>AR: approval_func(ApprovalRequest(code_blocks))
activate AR
AR->>AR: 用户确认代码执行
AR->>CEA: 返回ApprovalResponse(approved)
deactivate AR
alt 用户拒绝
CEA->>CEA: yield Response("代码执行被用户拒绝")
break
end
end
Note over CEA: 步骤6-7: 执行代码块
CEA->>CE: execute_code_block(code_blocks, cancellation_token)
activate CE
loop 对每个代码块
CE->>CE: 根据语言选择执行器
CE->>CE: 在隔离环境中执行代码
end
CE->>CEA: 返回CodeResult(output, exit_code)
deactivate CE
Note over CEA: 步骤8: 更新模型上下文
CEA->>MCT: add_message(UserMessage(execution_result.output))
CEA->>CEA: yield CodeExecutionEvent(result, retry_attempt)
Note over CEA: 步骤9: 检查执行结果
alt exit_code == 0 或 达到最大重试次数
CEA->>CEA: 执行成功或重试耗尽,结束循环
break
else 执行失败且还有重试次数
Note over CEA: 步骤10: 生成重试提示
CEA->>CEA: 构造重试prompt询问是否继续
CEA->>MC: create(messages + retry_prompt)
MC->>CEA: 返回是否重试的决策
alt 决定重试
Note over CEA: 继续下一次循环
else 决定停止
CEA->>CEA: 结束重试循环
break
end
end
end
CEA->>U: 最终返回TaskResult(所有messages)
deactivate CEA
8. 流式处理时序
8.1 流式响应生成
sequenceDiagram
participant U as 用户
participant A as Agent
participant MC as ModelClient
participant SM as StreamManager
participant C as Console
Note over U,C: 流式响应生成和显示流程
U->>A: run_stream(task="写一篇长文章")
activate A
A->>SM: 创建流式上下文
activate SM
SM->>A: 流式上下文就绪
A->>MC: create_stream(messages, tools)
activate MC
loop 流式生成循环
MC->>MC: 生成内容chunk
MC->>A: 发送StreamingChunk
A->>SM: 处理chunk
SM->>C: 发送ModelClientStreamingChunkEvent
C->>C: 实时显示内容
end
MC->>A: 流式生成完成
deactivate MC
A->>A: 构造最终Response
A->>SM: 发送最终响应
SM->>C: 发送TaskResult
deactivate SM
C->>C: 显示完成状态
A->>U: 流式处理完成
deactivate A
7.2 团队流式协作
sequenceDiagram
participant U as 用户
participant T as Team
participant A1 as Agent1
participant A2 as Agent2
participant A3 as Agent3
participant SM as StreamManager
Note over U,SM: 团队流式协作流程
U->>T: run_stream(task="协作分析项目")
activate T
T->>SM: 初始化团队流式上下文
activate SM
loop 团队协作循环
T->>A1: 分配子任务1
activate A1
A1->>A1: 处理子任务
loop Agent1流式输出
A1->>SM: 发送中间结果
SM->>U: 流式显示Agent1进度
end
A1->>T: 子任务1完成
deactivate A1
T->>A2: 基于A1结果分配子任务2
activate A2
A2->>A2: 处理子任务2
loop Agent2流式输出
A2->>SM: 发送中间结果
SM->>U: 流式显示Agent2进度
end
A2->>T: 子任务2完成
deactivate A2
T->>T: 检查终止条件
break 当满足终止条件
end
T->>A3: 最终整合任务
activate A3
A3->>A3: 整合所有结果
loop 最终输出流
A3->>SM: 发送最终内容
SM->>U: 流式显示最终结果
end
A3->>T: 整合完成
deactivate A3
T->>SM: 团队协作完成
SM->>U: 发送TaskResult
deactivate SM
T->>U: 流式协作完成
deactivate T
8. 监控和诊断时序
8.1 性能监控数据收集
sequenceDiagram
participant A as Agent
participant PM as PerformanceMonitor
participant MC as MetricsCollector
participant PROM as Prometheus
participant GRAF as Grafana
participant AM as AlertManager
Note over A,AM: 性能监控和告警流程
loop 监控循环 (每30秒)
PM->>A: 收集性能指标
activate PM
A->>PM: 返回当前指标
PM->>MC: 汇总性能数据
activate MC
MC->>MC: 计算衍生指标
MC->>PROM: 推送指标数据
activate PROM
PROM->>PROM: 存储时序数据
PROM->>MC: 推送成功
deactivate PROM
deactivate MC
PM->>PM: 检查告警阈值
alt 超过告警阈值
PM->>AM: 触发告警
activate AM
AM->>AM: 评估告警严重性
AM->>GRAF: 更新仪表板状态
AM->>AM: 发送通知
deactivate AM
end
deactivate PM
end
Note over GRAF,AM: 可视化监控流程
GRAF->>PROM: 查询历史指标
activate GRAF
PROM->>GRAF: 返回时序数据
GRAF->>GRAF: 生成图表和仪表板
deactivate GRAF
8.2 分布式链路追踪
sequenceDiagram
participant C as Client
participant G as Gateway
participant W as Worker
participant A as Agent
participant JAEGER as Jaeger
Note over C,JAEGER: 分布式链路追踪流程
C->>G: 发送请求 (生成TraceID)
activate G
G->>JAEGER: 开始根Span
activate JAEGER
G->>W: 转发请求 (传播TraceID)
activate W
W->>JAEGER: 创建子Span
W->>A: 调用代理 (传播TraceID)
activate A
A->>JAEGER: 创建代理处理Span
A->>A: 执行业务逻辑
loop 代理内部操作追踪
A->>JAEGER: 记录操作事件
JAEGER->>JAEGER: 添加Span事件
end
A->>JAEGER: 完成代理Span
A->>W: 返回处理结果
deactivate A
W->>JAEGER: 完成Worker Span
W->>G: 返回结果
deactivate W
G->>JAEGER: 完成Gateway Span
deactivate JAEGER
G->>C: 返回最终结果
deactivate G
Note over JAEGER: 链路分析和可视化
JAEGER->>JAEGER: 分析完整调用链
JAEGER->>JAEGER: 识别性能瓶颈
JAEGER->>JAEGER: 生成链路拓扑图
9. 总结
通过这些详细的时序图分析,我们可以清晰地看到AutoGen系统的核心交互模式:
9.1 关键设计模式
- 异步事件驱动:所有交互都基于异步事件,避免阻塞
- 发布订阅解耦:发布者和订阅者完全解耦,提高系统弹性
- 工厂模式延迟加载:代理按需创建,优化资源使用
- 中间件模式:可插拔的中间件支持横切关注点
- 断路器模式:故障隔离和快速恢复
9.2 性能优化要点
- 并发处理:消息处理、工具调用、状态同步都采用并发模式
- 连接复用:gRPC连接池和长连接复用
- 批处理优化:消息批处理和状态批量同步
- 缓存策略:多级缓存减少重复计算
- 资源管理:对象池和生命周期管理
9.3 可靠性保障
- 故障检测:主动健康检查和被动故障发现
- 自动恢复:Worker故障自动切换和重连
- 状态一致性:分布式状态同步和事务管理
- 监控告警:全链路监控和智能告警
- 优雅降级:服务降级和熔断保护
这些时序图为理解AutoGen的运行机制提供了清晰的视角,有助于开发者设计高效可靠的多代理应用系统。