AutoGen-01-PythonCore
模块职责
核心职责
autogen-core是AutoGen框架的底层核心模块,提供基于Actor模型的事件驱动代理系统基础设施。主要职责包括:
- 代理生命周期管理:提供代理的创建、注册、销毁和状态管理
- 消息传递系统:实现可靠的异步消息传递和发布-订阅机制
- 运行时管理:提供本地单线程和分布式运行时环境
- 序列化框架:支持多种消息序列化格式(JSON、Protobuf)
- 订阅路由机制:基于类型和主题的消息路由和订阅管理
输入与输出
- 输入:用户定义的代理类、消息类型、订阅规则、运行时配置
- 输出:运行中的代理实例、消息传递结果、代理状态快照
上下游依赖
- 上游依赖:Python asyncio、gRPC、Protobuf、Pydantic
- 下游调用方:autogen-agentchat、用户自定义代理、第三方扩展
生命周期
初始化 → 代理注册 → 运行时启动 → 消息处理 → 状态管理 → 优雅关闭
模块架构图
flowchart TB
subgraph "用户代理层"
UserAgent[用户自定义代理<br/>继承BaseAgent/RoutedAgent]
CustomLogic[业务逻辑处理<br/>@event/@rpc装饰器]
end
subgraph "代理抽象层"
Agent[Agent Protocol<br/>代理接口定义]
BaseAgent[BaseAgent<br/>基础代理实现]
RoutedAgent[RoutedAgent<br/>路由代理实现]
end
subgraph "运行时层"
AgentRuntime[AgentRuntime Protocol<br/>运行时接口]
STRuntime[SingleThreadedAgentRuntime<br/>单线程运行时]
DistRuntime[DistributedAgentRuntime<br/>分布式运行时]
end
subgraph "消息处理层"
MessageQueue[消息队列<br/>异步消息缓冲]
MessageRouter[消息路由器<br/>基于类型的路由]
MessageHandler[消息处理器<br/>装饰器注册]
end
subgraph "订阅管理层"
SubscriptionManager[订阅管理器<br/>订阅关系维护]
TypeSubscription[类型订阅<br/>基于消息类型]
TopicSubscription[主题订阅<br/>基于主题ID]
end
subgraph "序列化层"
SerializationRegistry[序列化注册表<br/>格式映射管理]
JSONSerializer[JSON序列化器<br/>默认格式]
ProtobufSerializer[Protobuf序列化器<br/>跨语言支持]
end
subgraph "基础设施层"
AgentId[AgentId<br/>代理唯一标识]
MessageContext[MessageContext<br/>消息上下文]
CancellationToken[CancellationToken<br/>取消令牌]
end
UserAgent --> BaseAgent
UserAgent --> RoutedAgent
CustomLogic --> MessageHandler
BaseAgent --> Agent
RoutedAgent --> BaseAgent
Agent --> AgentRuntime
STRuntime --> AgentRuntime
DistRuntime --> AgentRuntime
STRuntime --> MessageQueue
MessageQueue --> MessageRouter
MessageRouter --> MessageHandler
STRuntime --> SubscriptionManager
SubscriptionManager --> TypeSubscription
SubscriptionManager --> TopicSubscription
MessageHandler --> SerializationRegistry
SerializationRegistry --> JSONSerializer
SerializationRegistry --> ProtobufSerializer
MessageContext --> AgentId
STRuntime --> CancellationToken
架构要点说明
分层设计:
- 用户代理层:用户实现具体业务逻辑,通过继承和装饰器定义行为
- 代理抽象层:提供统一的代理接口和基础实现,支持多种代理模式
- 运行时层:管理代理生命周期和消息传递,支持本地和分布式部署
- 消息处理层:异步消息队列和路由机制,确保消息可靠传递
- 订阅管理层:灵活的订阅机制,支持基于类型和主题的消息路由
- 序列化层:多格式序列化支持,确保跨语言兼容性
扩展点设计:
- 代理可继承BaseAgent或RoutedAgent实现不同的消息处理策略
- 序列化器可插拔,支持自定义消息格式
- 订阅机制可扩展,支持复杂路由规则
- 运行时可替换,支持不同的部署模式
状态管理:
- 代理状态通过save_state/load_state进行持久化
- 运行时状态包含所有代理的完整状态快照
- 消息传递状态通过CancellationToken进行取消控制
资源占用要点:
- 单个代理实例基础内存占用:< 1MB
- 消息队列默认深度:10000条消息
- 并发处理上限:基于asyncio事件循环的协程数量限制
核心数据结构
UML类图
classDiagram
class Agent {
<<interface>>
+AgentMetadata metadata
+AgentId id
+bind_id_and_runtime(id, runtime) None
+on_message(message, ctx) Any
+save_state() Mapping[str, Any]
+load_state(state) None
+close() None
}
class BaseAgent {
<<abstract>>
-AgentId _id
-AgentRuntime _runtime
-str _description
+__init__(description)
+bind_id_and_runtime(id, runtime) None
+send_message(message, recipient) Any
+publish_message(message, topic_id) None
+register(runtime, type, factory) AgentType*
+register_instance(runtime, agent_id) AgentId*
#on_message_impl(message, ctx) Any*
}
class RoutedAgent {
-DefaultDict _handlers
+__init__(description)
+on_message_impl(message, ctx) Any
+on_unhandled_message(message, ctx) None
+_discover_handlers() Sequence[MessageHandler]*
}
class AgentId {
-str type
-str key
+__init__(type, key)
+__str__() str
+__eq__(other) bool
+__hash__() int
}
class AgentMetadata {
+str key
+str type
+str description
}
class MessageContext {
+AgentId|None sender
+TopicId|None topic_id
+bool is_rpc
+CancellationToken cancellation_token
+str message_id
}
class AgentRuntime {
<<interface>>
+send_message(message, recipient) Any
+publish_message(message, topic_id) None
+register_factory(type, factory) AgentType
+register_agent_instance(agent, id) AgentId
+add_subscription(subscription) None
+save_state() Mapping[str, Any]
+load_state(state) None
}
class SingleThreadedAgentRuntime {
-Queue _message_queue
-Dict[str, Agent] _agents
-Dict[str, Callable] _agent_factories
-SubscriptionManager _subscription_manager
-SerializationRegistry _serialization_registry
-Task _message_processor_task
+start() None
+stop() None
+send_message(message, recipient) Any
+publish_message(message, topic_id) None
-_process_send(envelope) None
-_process_publish(envelope) None
-_get_agent(agent_id) Agent
}
class MessageHandler {
<<protocol>>
+Sequence[type] target_types
+Sequence[type] produces_types
+bool is_message_handler
+Callable router
+__call__(agent, message, ctx) Any
}
class Subscription {
<<interface>>
+str id
+is_match(topic_id) bool
+map_to_agent(topic_id) AgentId
}
class TypeSubscription {
-str _topic_type
-str _agent_type
-str _id
+is_match(topic_id) bool
+map_to_agent(topic_id) AgentId
}
class TopicId {
+str type
+str source
+__init__(type, source)
}
class CancellationToken {
-bool _cancelled
-List[Future] _linked_futures
+is_cancelled() bool
+cancel() None
+link_future(future) None
}
class MessageSerializer {
<<protocol>>
+str data_content_type
+str type_name
+serialize(message) bytes
+deserialize(payload) Any
}
Agent <|.. BaseAgent : implements
BaseAgent <|-- RoutedAgent : extends
Agent --> AgentId : has
Agent --> AgentMetadata : has
BaseAgent --> AgentRuntime : uses
RoutedAgent --> MessageHandler : contains
AgentRuntime <|.. SingleThreadedAgentRuntime : implements
SingleThreadedAgentRuntime --> Subscription : manages
Subscription <|.. TypeSubscription : implements
MessageContext --> AgentId : references
MessageContext --> TopicId : references
MessageContext --> CancellationToken : contains
SingleThreadedAgentRuntime --> MessageSerializer : uses
代理标识体系
AgentId - 代理唯一标识
字段说明:
type:代理类型名称,用于工厂方法查找和路由key:代理实例键值,同类型下的唯一标识符
约束规则:
- type必须是有效的Python标识符格式(字母、数字、下划线、短横线)
- key可以是任意字符串,建议使用UUID或语义化名称
- 组合(type, key)在运行时内必须全局唯一
使用模式:
# 按类型创建默认实例
agent_id = AgentId(type="chat_bot", key="default")
# 按业务场景创建多实例
user_agent = AgentId(type="user_handler", key=f"user_{user_id}")
session_agent = AgentId(type="session_manager", key=f"session_{session_id}")
AgentMetadata - 代理元数据
字段说明:
key:来自AgentId.key,实例级标识type:来自AgentId.type,类型级标识description:人类可读的代理描述
用途:
- 运行时代理发现和枚举
- 调试和监控中的代理识别
- 管理界面的代理信息展示
消息传递体系
MessageContext - 消息上下文
字段说明:
sender:发送方代理ID,可为None(外部发送)topic_id:主题标识,仅发布消息时非Noneis_rpc:标识是否为同步RPC调用cancellation_token:取消令牌,支持操作中断message_id:消息唯一标识,用于追踪和去重
上下文语义:
# RPC上下文特征
rpc_context = MessageContext(
sender=AgentId("caller", "instance1"),
topic_id=None, # RPC无主题
is_rpc=True, # 期待返回值
message_id="rpc_12345"
)
# 事件上下文特征
event_context = MessageContext(
sender=AgentId("publisher", "instance1"),
topic_id=TopicId("user_events", "user_123"),
is_rpc=False, # 无返回值期待
message_id="event_67890"
)
TopicId - 主题标识
字段说明:
type:主题类型,用于订阅匹配source:主题来源,用于代理实例路由
路由机制:
- 订阅者按type进行粗粒度匹配
- 根据source创建或路由到对应代理实例
- 支持层级命名空间(如user.login、user.logout)
运行时架构
SingleThreadedAgentRuntime - 单线程运行时
核心数据结构:
_message_queue:异步消息队列,FIFO处理模式_agents:代理实例缓存,按AgentId索引_agent_factories:代理工厂注册表,按type索引_subscription_manager:订阅关系管理器_serialization_registry:消息序列化器注册表
消息处理流水线:
消息投递 → 队列缓冲 → 类型检查 → 实例获取 → 处理器调用 → 结果返回
并发控制:
- 基于asyncio单线程事件循环
- 消息按序处理,保证因果顺序
- 支持协程级并发(同一代理可并发处理多个消息)
订阅路由体系
Subscription接口族
TypeSubscription - 类型订阅:
- 匹配规则:topic_id.type == subscription.topic_type
- 映射规则:AgentId(agent_type, topic_id.source)
- 使用场景:按用户、会话、设备等维度分片的代理
TypePrefixSubscription - 前缀订阅:
- 匹配规则:topic_id.type.startswith(prefix)
- 映射规则:固定代理实例
- 使用场景:处理一类相关事件的单例代理
消息序列化框架
MessageSerializer接口
标准实现:
JSONMessageSerializer:基于JSON,人类可读ProtobufMessageSerializer:基于Protobuf,跨语言兼容PickleMessageSerializer:基于Pickle,Python专用高效格式
序列化策略:
# 自动类型推导
@dataclass
class UserMessage:
content: str
timestamp: datetime
# 序列化器自动注册
serializer = JSONMessageSerializer(UserMessage)
runtime.add_message_serializer(serializer)
# 运行时自动选择合适的序列化器
data = serializer.serialize(UserMessage("hello", datetime.now()))
message = serializer.deserialize(data)
典型场景时序图
场景1:代理注册与启动
sequenceDiagram
autonumber
participant User as 用户代码
participant Runtime as SingleThreadedAgentRuntime
participant AgentFactory as 代理工厂
participant Agent as 代理实例
participant SubManager as 订阅管理器
User->>Runtime: 创建运行时实例
User->>Runtime: register_factory("chat_bot", factory)
Runtime->>Runtime: 存储工厂函数到_agent_factories
User->>Runtime: start()
Runtime->>Runtime: 启动消息处理协程
Runtime->>Runtime: _message_processor_task = create_task()
Note over Runtime: 运行时进入消息监听状态
User->>Runtime: get("chat_bot", "user_123")
Runtime->>Runtime: 检查_agents缓存
Runtime->>AgentFactory: 调用工厂函数创建代理
AgentFactory-->>Agent: 返回代理实例
Runtime->>Agent: bind_id_and_runtime(agent_id, self)
Agent->>Agent: 设置_id和_runtime属性
Runtime->>Runtime: 缓存到_agents[agent_id]
Runtime-->>User: 返回agent_id
关键步骤分析: 1-3. 工厂注册阶段:用户注册代理类型和对应的工厂函数,运行时维护类型到工厂的映射关系 4-6. 运行时启动:创建消息处理协程,运行时进入活跃状态等待消息 7-12. 按需实例化:首次访问代理时通过工厂创建实例,并建立代理与运行时的绑定关系
边界条件:
- 工厂函数调用失败时,运行时抛出AgentConstructionException
- 代理绑定过程中如果ID冲突,抛出RuntimeError
- 运行时未启动时调用get()方法会阻塞直到启动完成
性能要点:
- 工厂调用是同步操作,复杂初始化逻辑会阻塞其他代理创建
- 代理实例全局唯一,相同AgentId的重复get()调用直接返回缓存实例
- 订阅关系在代理注册时建立,后续消息路由无需重新计算
场景2:直接消息发送(RPC模式)
sequenceDiagram
autonumber
participant Sender as 发送方代理
participant Runtime as SingleThreadedAgentRuntime
participant Queue as 消息队列
participant Processor as 消息处理器
participant Receiver as 接收方代理
participant LLM as LLM服务
Sender->>Runtime: send_message(message, recipient_id)
Runtime->>Runtime: 创建SendMessageEnvelope
Runtime->>Queue: put(envelope)
Runtime->>Sender: 返回Future等待结果
par 异步消息处理
Processor->>Queue: get()获取消息信封
Processor->>Runtime: _process_send(envelope)
Runtime->>Runtime: _get_agent(recipient_id)
Runtime->>Receiver: 获取或创建代理实例
Runtime->>Receiver: on_message(message, context)
Receiver->>Receiver: 路由到具体处理器方法
alt 需要调用LLM
Receiver->>LLM: 调用模型API
LLM-->>Receiver: 返回生成结果
end
Receiver-->>Runtime: 返回处理结果
Runtime->>Runtime: envelope.future.set_result(response)
end
Runtime-->>Sender: 返回最终响应结果
消息流转过程: 1-4. 消息投递阶段:发送方将消息封装投递到异步队列,立即获得Future用于等待结果 5-8. 目标解析阶段:消息处理器从队列取出消息,定位目标代理实例(延迟创建) 9-11. 消息路由阶段:构造消息上下文,调用代理的消息处理入口 12-15. 业务处理阶段:根据消息类型路由到具体处理器,可能涉及外部服务调用 16-17. 结果返回阶段:处理结果通过Future机制异步返回给发送方
异常处理路径:
- 目标代理不存在:LookupError异常设置到Future
- 消息类型不支持:CantHandleException传播到发送方
- 处理器执行异常:原始异常包装后设置到Future
- 取消令牌触发:CancelledError中断处理流程
性能关键点:
- 消息投递是O(1)操作,不受系统负载影响
- 代理实例化可能涉及磁盘I/O(状态加载),建议预热
- LLM调用是性能瓶颈,建议配置超时和重试机制
- Future等待支持异步,不会阻塞其他消息处理
场景3:事件发布订阅
sequenceDiagram
autonumber
participant Publisher as 发布者代理
participant Runtime as SingleThreadedAgentRuntime
participant SubManager as 订阅管理器
participant Subscriber1 as 订阅者A
participant Subscriber2 as 订阅者B
participant Subscriber3 as 订阅者C
Publisher->>Runtime: publish_message(event, topic_id)
Runtime->>Runtime: 创建PublishMessageEnvelope
Runtime->>Runtime: 投递到消息队列
Runtime->>SubManager: get_subscribed_recipients(topic_id)
SubManager->>SubManager: 遍历订阅列表匹配topic_id
SubManager->>SubManager: 调用subscription.is_match(topic_id)
SubManager->>SubManager: 调用subscription.map_to_agent(topic_id)
SubManager-->>Runtime: 返回订阅者代理ID列表
par 并行投递给所有订阅者
Runtime->>Subscriber1: on_message(event, context)
Subscriber1->>Subscriber1: @event处理器处理
Subscriber1-->>Runtime: 处理完成(无返回值)
and
Runtime->>Subscriber2: on_message(event, context)
Subscriber2->>Subscriber2: @event处理器处理
Subscriber2-->>Runtime: 处理完成(无返回值)
and
Runtime->>Subscriber3: on_message(event, context)
Subscriber3->>Subscriber3: @event处理器处理
Subscriber3-->>Runtime: 处理完成(无返回值)
end
Runtime-->>Publisher: 发布完成确认
发布流程分析: 1-3. 消息发布阶段:发布者调用publish_message,运行时创建发布消息信封并投递队列 4-7. 订阅者发现:订阅管理器根据主题ID匹配所有相关订阅,映射到具体代理实例 8-14. 并行广播阶段:运行时并发调用所有订阅者的消息处理器,事件处理无返回值要求
订阅匹配机制:
- TypeSubscription:topic_id.type精确匹配,按topic_id.source创建代理实例
- TypePrefixSubscription:topic_id.type前缀匹配,路由到固定代理实例
- 自定义订阅:用户定义的匹配逻辑和代理映射规则
并发处理特征:
- 所有订阅者并行处理,单个订阅者异常不影响其他订阅者
- 事件处理采用"fire-and-forget"模式,发布者不等待处理结果
- 异常订阅者的错误会记录日志但不传播给发布者
性能考量:
- 订阅者数量直接影响并发度,建议监控协程池使用率
- 大量订阅者场景下可能需要分批处理避免资源耗尽
- 订阅关系变更频繁时需要考虑索引重建开销
场景4:代理状态保存与恢复
sequenceDiagram
autonumber
participant User as 用户代码
participant Runtime as SingleThreadedAgentRuntime
participant Agent1 as 代理实例1
participant Agent2 as 代理实例2
participant Storage as 存储系统
participant Factory as 代理工厂
Note over User,Storage: 状态保存流程
User->>Runtime: save_state()
Runtime->>Runtime: 遍历所有活跃代理实例
par 并行保存各代理状态
Runtime->>Agent1: save_state()
Agent1->>Agent1: 序列化内部状态
Agent1-->>Runtime: 返回状态字典
and
Runtime->>Agent2: save_state()
Agent2->>Agent2: 序列化内部状态
Agent2-->>Runtime: 返回状态字典
end
Runtime->>Runtime: 组装全局状态快照
Runtime->>Storage: 写入状态到持久存储
Storage-->>Runtime: 确认写入成功
Runtime-->>User: 返回状态保存结果
Note over User,Storage: 状态恢复流程
User->>Runtime: load_state(saved_state)
Runtime->>Storage: 从持久存储读取状态
Storage-->>Runtime: 返回完整状态数据
Runtime->>Runtime: 解析代理状态映射
loop 遍历每个代理状态
Runtime->>Factory: 通过工厂重建代理实例
Factory-->>Runtime: 返回新代理实例
Runtime->>Agent1: load_state(agent_specific_state)
Agent1->>Agent1: 反序列化并恢复内部状态
Agent1-->>Runtime: 确认状态加载完成
Runtime->>Runtime: 重建订阅关系
end
Runtime-->>User: 状态恢复完成
状态保存过程: 1-2. 保存触发:用户调用运行时的save_state方法,开始全局状态快照 3-8. 并行状态收集:运行时并发调用所有活跃代理的save_state方法,收集个体状态 9-12. 持久化存储:将聚合后的全局状态写入持久存储系统
状态恢复过程: 13-15. 数据读取:从持久存储读取完整的状态快照数据 16-23. 逐个重建:按照保存的代理类型和状态信息,通过工厂方法重建代理实例 24. 关系重建:恢复代理间的订阅关系和运行时绑定
状态一致性保证:
- 原子性:整个状态保存过程中暂停消息处理,确保快照一致性
- 版本控制:状态数据包含版本号,支持向后兼容的数据迁移
- 增量恢复:支持仅恢复部分代理状态,其他代理按需创建
异常处理策略:
- 单个代理状态保存失败不影响其他代理,但会在全局状态中标记
- 状态恢复过程中的代理重建失败会跳过该代理并记录错误
- 存储系统不可用时提供内存状态的临时保存机制
场景5:运行时启动与关闭
sequenceDiagram
autonumber
participant User as 用户代码
participant Runtime as SingleThreadedAgentRuntime
participant Queue as 消息队列
participant Processor as 消息处理器协程
participant Agent as 活跃代理们
participant Resources as 外部资源
Note over User,Resources: 运行时启动流程
User->>Runtime: start()
Runtime->>Queue: 初始化消息队列
Queue-->>Runtime: 队列创建完成
Runtime->>Processor: 创建消息处理协程
Processor->>Processor: 进入消息监听循环
Processor->>Queue: 等待消息到达
Runtime->>Runtime: 设置运行状态为RUNNING
Runtime-->>User: 启动完成确认
Note over User,Resources: 正常消息处理期间
loop 持续消息处理
Queue->>Processor: 接收到新消息
Processor->>Agent: 路由消息到目标代理
Agent->>Resources: 可能的外部资源调用
Resources-->>Agent: 返回处理结果
Agent-->>Processor: 消息处理完成
Processor->>Queue: 继续监听下一消息
end
Note over User,Resources: 优雅关闭流程
User->>Runtime: stop()
Runtime->>Runtime: 设置停止信号
Runtime->>Queue: 停止接受新消息
Runtime->>Processor: 等待当前消息处理完成
Processor->>Processor: 完成队列中剩余消息
Processor-->>Runtime: 处理器协程退出
par 并行关闭所有代理
Runtime->>Agent: close()调用
Agent->>Agent: 清理内部资源
Agent->>Resources: 关闭外部连接
Agent-->>Runtime: 代理关闭完成
end
Runtime->>Queue: 清理消息队列
Runtime->>Runtime: 设置运行状态为STOPPED
Runtime-->>User: 关闭完成确认
启动流程要点: 1-4. 基础设施初始化:创建消息队列和处理协程,建立异步处理管道 5-8. 监听循环启动:消息处理器进入持续监听状态,等待消息到达 9-10. 状态确认:运行时标记为活跃状态,对外提供服务就绪信号
消息处理循环: 11-18. 稳态运行期:持续的消息接收、路由、处理、响应循环
- 支持高并发消息处理(协程级并发)
- 自动负载均衡和背压控制
- 异常隔离确保单个消息失败不影响系统
优雅关闭流程: 19-21. 停止信号:用户触发停止,运行时拒绝新消息但继续处理已有消息 22-24. 消息处理完成:等待消息队列清空,确保没有丢失的消息 25-29. 资源清理:并发关闭所有代理实例,释放外部资源连接 30-32. 最终清理:清理内部数据结构,标记运行时为已停止状态
边界条件处理:
- 启动失败:任何初始化步骤失败都会回滚已完成的步骤
- 强制停止:提供force_stop()方法用于紧急情况下的立即停止
- 重复操作:多次调用start()或stop()是安全的(幂等性)
性能与可靠性:
- 启动时间与注册的代理工厂数量无关(延迟实例化)
- 关闭时间与活跃代理数量和其清理复杂度相关
- 支持超时机制防止异常代理阻塞整个关闭流程
核心算法与流程
消息传递算法
async def send_message(self, message, recipient, sender=None, cancellation_token=None):
# 1. 消息预处理和验证
if cancellation_token is None:
cancellation_token = CancellationToken()
# 2. 消息序列化和包装
message_envelope = SendMessageEnvelope(
message=message,
recipient=recipient,
sender=sender,
future=asyncio.Future(),
cancellation_token=cancellation_token
)
# 3. 投递到异步消息队列
await self._message_queue.put(message_envelope)
# 4. 等待处理结果
return await message_envelope.future
算法特点:
- 异步非阻塞:使用asyncio.Future实现异步等待
- 可取消操作:通过CancellationToken支持操作取消
- 错误隔离:消息处理异常不会影响发送方
- 时间复杂度:O(1)消息投递,O(n)消息处理(n为处理步骤数)
订阅路由算法
async def get_subscribed_recipients(self, topic_id: TopicId) -> List[AgentId]:
recipients = []
for subscription in self._subscriptions.values():
# 1. 类型匹配检查
if subscription.is_match(topic_id):
# 2. 代理ID映射
agent_id = subscription.map_to_agent(topic_id)
recipients.append(agent_id)
return recipients
算法特点:
- 双层匹配:先按类型匹配订阅,再映射到具体代理
- 动态路由:支持运行时订阅规则变更
- 批量投递:一次发布可触发多个代理处理
- 时间复杂度:O(m)(m为订阅数量)
代理发现与实例化
async def _get_agent(self, agent_id: AgentId) -> Agent:
# 1. 实例缓存查找
if agent_id in self._agents:
return self._agents[agent_id]
# 2. 工厂方法创建
if agent_id.type in self._agent_factories:
with AgentInstantiationContext.populate_context(agent_id, self):
agent = await self._agent_factories[agent_id.type]()
await agent.bind_id_and_runtime(agent_id, self)
self._agents[agent_id] = agent
return agent
raise LookupError(f"Agent {agent_id} not found")
算法特点:
- 延迟实例化:按需创建代理实例,节省内存
- 实例复用:相同AgentId的代理实例全局唯一
- 上下文注入:实例化过程中注入运行时上下文
- 异常安全:实例化失败不影响其他代理
性能特征与优化
吞吐量指标
- 消息处理速率:单线程模式下 > 10,000 msg/s
- 代理并发数:理论无限制(受内存限制)
- 订阅数量:建议 < 10,000个订阅规则
延迟特征
- 本地消息传递:< 1ms(不含业务处理时间)
- 代理实例化:< 10ms(简单代理)
- 状态序列化:< 100ms(10MB状态数据)
资源限制
- 内存占用:每个代理实例 < 1MB基础内存
- 文件句柄:主要用于日志和网络连接
- 线程使用:单线程事件循环模式
扩展性考虑
- 水平扩展:需要分布式运行时支持
- 垂直扩展:受Python GIL限制,CPU密集型任务需要进程池
- 状态分片:大型状态需要分解为多个代理
内存优化策略
批量处理:
# 消息批量投递减少队列开销
async def batch_publish(messages: List[Message], topic_id: TopicId):
batch_envelope = BatchPublishEnvelope(
messages=messages,
topic_id=topic_id,
timestamp=time.time()
)
await self._message_queue.put(batch_envelope)
状态分片:
# 大状态对象分解为多个小状态
class ShardedAgent(BaseAgent):
async def save_state(self) -> Mapping[str, Any]:
return {
"shard_1": self._state_shard_1,
"shard_2": self._state_shard_2,
# 按需加载的分片
"shard_meta": {"active_shards": ["shard_1", "shard_2"]}
}