概述

autogen-core是AutoGen Python实现的核心包,提供了代理系统的基础抽象和运行时环境。本文将深入分析其核心组件的设计理念、实现细节和关键代码路径。

1. 核心架构设计

1.1 模块组织结构

graph TB subgraph "autogen-core 核心模块架构" subgraph "代理抽象层" A[Agent - 代理协议] BA[BaseAgent - 基础代理] RA[RoutedAgent - 路由代理] CA[ClosureAgent - 闭包代理] end subgraph "运行时层" AR[AgentRuntime - 运行时协议] STAR[SingleThreadedAgentRuntime - 单线程运行时] RC[RunContext - 运行上下文] end subgraph "消息处理层" MC[MessageContext - 消息上下文] MH[MessageHandler - 消息处理器] MHC[MessageHandlerContext - 处理器上下文] end subgraph "订阅与路由" S[Subscription - 订阅协议] TS[TypeSubscription - 类型订阅] TPS[TypePrefixSubscription - 前缀订阅] DS[DefaultSubscription - 默认订阅] end subgraph "标识与主题" AI[AgentId - 代理标识] AT[AgentType - 代理类型] TI[TopicId - 主题标识] end subgraph "序列化与通信" MS[MessageSerializer - 消息序列化器] SR[SerializationRegistry - 序列化注册表] UP[UnknownPayload - 未知载荷] end subgraph "工具与组件" T[Tool - 工具抽象] FT[FunctionTool - 函数工具] CC[ComponentConfig - 组件配置] end end %% 依赖关系 BA --> A RA --> BA CA --> BA STAR --> AR RC --> STAR MH --> MC MHC --> MH TS --> S TPS --> S DS --> S AI --> AT TI --> AI MS --> SR UP --> MS FT --> T CC --> FT style A fill:#e1f5fe style STAR fill:#f3e5f5 style S fill:#e8f5e8 style MS fill:#fff3e0

1.2 核心设计模式

协议驱动设计 (Protocol-Driven Design)

使用Python的Protocol定义接口规范,提供类型安全和灵活性:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@runtime_checkable  
class Agent(Protocol):
    """代理协议定义"""
    async def on_message(self, message: Any, ctx: MessageContext) -> Any:
        """处理消息的核心方法"""
        ...

@runtime_checkable
class AgentRuntime(Protocol):
    """运行时协议定义"""
    async def send_message(self, message: Any, recipient: AgentId, ...) -> Any:
        """发送消息到指定代理"""
        ...
    
    async def publish_message(self, message: Any, topic_id: TopicId, ...) -> None:
        """发布消息到主题"""
        ...

装饰器模式 (Decorator Pattern)

使用装饰器简化消息处理器的定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class ChatAgent(RoutedAgent):
    @message_handler  # 通用消息处理装饰器
    async def handle_text_message(self, message: TextMessage, ctx: MessageContext) -> str:
        return f"收到消息: {message.content}"
    
    @event  # 事件处理装饰器  
    async def handle_notification(self, event: NotificationEvent, ctx: MessageContext) -> None:
        print(f"收到事件: {event.type}")
    
    @rpc  # RPC调用装饰器
    async def process_request(self, request: ProcessRequest, ctx: MessageContext) -> ProcessResponse:
        result = await self.complex_processing(request.data)
        return ProcessResponse(result=result)

2. 核心组件详解

2.1 代理标识系统 (AgentId)

设计理念

代理标识采用类型化命名空间设计,确保全局唯一性和类型安全:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class AgentId:
    """代理ID由类型和键组成,形成唯一标识"""
    
    def __init__(self, type: str | AgentType, key: str) -> None:
        if isinstance(type, AgentType):
            type = type.type
            
        # 验证类型格式:只允许字母、数字、下划线、连字符和点
        if not is_valid_agent_type(type):
            raise ValueError(f"Invalid agent type: {type}")
            
        self._type = type  # 代理类型,如 "ChatAgent"
        self._key = key    # 代理实例键,如 "user_123"

关键功能实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def is_valid_agent_type(value: str) -> bool:
    """验证代理类型格式
    
    允许的字符:字母、数字、下划线(_)、连字符(-)、点(.)
    正则表达式:^[\w\-\.]+\Z
    """
    return bool(re.match(r"^[\w\-\.]+\Z", value))

class AgentId:
    def __hash__(self) -> int:
        """支持作为字典键"""
        return hash((self._type, self._key))
    
    def __str__(self) -> str:
        """字符串表示:type/key"""
        return f"{self._type}/{self._key}"
    
    @classmethod  
    def from_str(cls, agent_id: str) -> Self:
        """从字符串解析AgentId:'ChatAgent/user_123'"""
        items = agent_id.split("/", maxsplit=1)
        if len(items) != 2:
            raise ValueError(f"Invalid agent id: {agent_id}")
        type, key = items[0], items[1]
        return cls(type, key)

2.2 主题标识系统 (TopicId)

CloudEvents兼容设计

遵循CloudEvents规范,提供标准化的事件标识:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@dataclass(eq=True, frozen=True)
class TopicId:
    """主题标识,兼容CloudEvents规范"""
    
    type: str    # 事件类型,如 "com.microsoft.autogen.chat.message"
    source: str  # 事件源,如 "agent://ChatAgent/user_123"
    
    def __post_init__(self) -> None:
        """验证主题类型格式"""
        if not is_valid_topic_type(self.type):
            raise ValueError(f"Invalid topic type: {self.type}")

def is_valid_topic_type(value: str) -> bool:
    """验证主题类型格式
    
    允许的字符:字母、数字、下划线(_)、连字符(-)、点(.)、冒号(:)、等号(=)
    正则表达式:^[\w\-\.\:\=]+\Z
    """
    return bool(re.match(r"^[\w\-\.\:\=]+\Z", value))

主题命名约定

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 系统级事件
SYSTEM_STARTUP = TopicId("com.microsoft.autogen.system.startup", "runtime://system")
SYSTEM_SHUTDOWN = TopicId("com.microsoft.autogen.system.shutdown", "runtime://system")

# 代理生命周期事件  
AGENT_CREATED = TopicId("com.microsoft.autogen.agent.created", "agent://ChatAgent/user_123")
AGENT_DESTROYED = TopicId("com.microsoft.autogen.agent.destroyed", "agent://ChatAgent/user_123")

# 业务事件
CHAT_MESSAGE = TopicId("com.example.chat.message", "agent://ChatAgent/user_123")
TASK_COMPLETED = TopicId("com.example.task.completed", "agent://TaskAgent/task_456")

2.3 消息处理机制

消息上下文 (MessageContext)

封装消息处理所需的所有上下文信息:

1
2
3
4
5
6
7
8
@dataclass
class MessageContext:
    """消息处理上下文"""
    sender: AgentId | None          # 发送方代理ID
    topic_id: TopicId | None        # 主题ID(发布消息时使用)  
    is_rpc: bool                    # 是否为RPC调用
    cancellation_token: CancellationToken  # 取消令牌
    message_id: str                 # 消息唯一标识

消息处理器装饰器实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def message_handler(
    func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None,
    *,
    strict: bool = True,  # 严格类型检查
    match: None | Callable[[ReceivesT, MessageContext], bool] = None,  # 二次路由匹配
) -> MessageHandler[AgentT, ReceivesT, ProducesT]:
    """消息处理器装饰器
    
    Args:
        func: 被装饰的异步方法
        strict: 启用严格类型检查
        match: 二次路由匹配函数
    """
    
    def decorator(func: Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]) -> MessageHandler[AgentT, ReceivesT, ProducesT]:
        # 提取类型信息
        type_hints = get_type_hints(func)
        target_types = get_types(type_hints["message"])  # 输入类型
        return_types = get_types(type_hints["return"])   # 返回类型
        
        @wraps(func)
        async def wrapper(self: AgentT, message: ReceivesT, ctx: MessageContext) -> ProducesT:
            # 类型检查
            if type(message) not in target_types:
                if strict:
                    raise CantHandleException(f"消息类型 {type(message)} 不在目标类型 {target_types} 中")
                else:
                    logger.warning(f"消息类型不匹配: {type(message)} not in {target_types}")
            
            # 执行处理逻辑
            return_value = await func(self, message, ctx)
            
            # 返回类型检查
            if return_value is not None and type(return_value) not in return_types:
                if strict:
                    raise RuntimeError(f"返回类型 {type(return_value)} 不在预期类型 {return_types} 中")
                else:
                    logger.warning(f"返回类型不匹配: {type(return_value)} not in {return_types}")
            
            return return_value
        
        # 添加元数据
        wrapper.target_types = target_types
        wrapper.produces_types = return_types
        wrapper.is_message_handler = True
        wrapper.router = match or (lambda message, ctx: True)
        
        return cast(MessageHandler[AgentT, ReceivesT, ProducesT], wrapper)
    
    return decorator if func is None else decorator(func)

专用装饰器实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def event(func: Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]]) -> MessageHandler[AgentT, ReceivesT, None]:
    """事件处理装饰器 - 无返回值"""
    return message_handler(func)

def rpc(
    func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None,
    *,
    strict: bool = True,
    match: None | Callable[[ReceivesT, MessageContext], bool] = None,
) -> MessageHandler[AgentT, ReceivesT, ProducesT]:
    """RPC调用装饰器 - 必须有返回值"""
    
    def decorator(func: Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]) -> MessageHandler[AgentT, ReceivesT, ProducesT]:
        handler = message_handler(func, strict=strict, match=match)
        
        # 验证RPC方法必须有返回值
        type_hints = get_type_hints(func)
        return_type = type_hints.get("return")
        if return_type is None or return_type is type(None):
            raise AssertionError("RPC方法必须有返回值")
        
        return handler
    
    return decorator if func is None else decorator(func)

2.4 订阅机制

订阅协议定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@runtime_checkable
class Subscription(Protocol):
    """订阅协议定义"""
    
    @property
    def id(self) -> str:
        """订阅唯一标识"""
        ...
    
    def is_match(self, topic_id: TopicId) -> bool:
        """检查主题ID是否匹配此订阅"""
        ...
    
    def map_to_agent(self, topic_id: TopicId) -> AgentId:
        """将主题ID映射到代理ID"""
        ...

类型订阅实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@dataclass
class TypeSubscription:
    """类型订阅 - 精确匹配主题类型"""
    
    id: str
    topic_type: str      # 要匹配的主题类型
    agent_type: str      # 目标代理类型
    
    def is_match(self, topic_id: TopicId) -> bool:
        """精确匹配主题类型"""
        return topic_id.type == self.topic_type
    
    def map_to_agent(self, topic_id: TopicId) -> AgentId:
        """映射到指定代理类型的默认实例"""
        if not self.is_match(topic_id):
            raise CantHandleException(f"主题 {topic_id} 不匹配订阅 {self.topic_type}")
        return AgentId(self.agent_type, "default")

# 使用示例
chat_subscription = TypeSubscription(
    id="chat-subscription-001",
    topic_type="com.example.chat.message", 
    agent_type="ChatAgent"
)

前缀订阅实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@dataclass  
class TypePrefixSubscription:
    """前缀订阅 - 匹配主题类型前缀"""
    
    id: str
    topic_type_prefix: str  # 主题类型前缀
    agent_type: str         # 目标代理类型
    
    def is_match(self, topic_id: TopicId) -> bool:
        """前缀匹配主题类型"""
        return topic_id.type.startswith(self.topic_type_prefix)
    
    def map_to_agent(self, topic_id: TopicId) -> AgentId:
        """映射到指定代理类型,使用源作为键"""
        if not self.is_match(topic_id):
            raise CantHandleException(f"主题 {topic_id} 不匹配前缀 {self.topic_type_prefix}")
        
        # 从主题源中提取代理键
        # 例如:agent://ChatAgent/user_123 -> user_123
        if topic_id.source.startswith("agent://"):
            agent_path = topic_id.source[8:]  # 移除 "agent://" 前缀
            if "/" in agent_path:
                _, key = agent_path.split("/", 1)
                return AgentId(self.agent_type, key)
        
        return AgentId(self.agent_type, "default")

# 使用示例
system_subscription = TypePrefixSubscription(
    id="system-subscription-001",
    topic_type_prefix="com.microsoft.autogen.system.",
    agent_type="SystemAgent" 
)

2.5 单线程代理运行时

运行时架构设计

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class SingleThreadedAgentRuntime(AgentRuntime):
    """单线程代理运行时
    
    特点:
    - 使用单个asyncio队列处理所有消息
    - 消息按接收顺序处理
    - 每个消息在独立的asyncio任务中并发处理
    - 适用于开发和独立应用,不适合高吞吐量场景
    """
    
    def __init__(
        self,
        intervention_handlers: List[InterventionHandler] | None = None,
        tracer_provider: TracerProvider | None = None,
        ignore_unhandled_exceptions: bool = True,
    ) -> None:
        # 消息队列 - 核心数据结构
        self._message_queue: Queue[
            PublishMessageEnvelope | SendMessageEnvelope | ResponseMessageEnvelope
        ] = Queue()
        
        # 代理管理
        self._agent_factories: Dict[str, Callable[[], Awaitable[Agent]]] = {}
        self._active_agents: Dict[AgentId, Agent] = {}
        
        # 订阅管理
        self._subscription_manager = SubscriptionManager()
        
        # 序列化管理
        self._serialization_registry = SerializationRegistry()
        
        # 运行控制
        self._running = False
        self._stop_event = asyncio.Event()

消息处理核心循环

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async def _process_next(self) -> None:
    """处理队列中的下一个消息"""
    try:
        # 从队列获取消息信封
        envelope = await self._message_queue.get()
        
        # 根据消息类型分发处理
        if isinstance(envelope, PublishMessageEnvelope):
            await self._handle_publish_message(envelope)
        elif isinstance(envelope, SendMessageEnvelope):
            await self._handle_send_message(envelope)  
        elif isinstance(envelope, ResponseMessageEnvelope):
            await self._handle_response_message(envelope)
        else:
            logger.warning(f"未知消息信封类型: {type(envelope)}")
    
    except QueueShutDown:
        # 队列已关闭,停止处理
        return
    except Exception as e:
        logger.error(f"处理消息时发生异常: {e}", exc_info=True)
        if not self._ignore_unhandled_exceptions:
            raise
    finally:
        self._message_queue.task_done()

async def _handle_publish_message(self, envelope: PublishMessageEnvelope) -> None:
    """处理发布消息"""
    # 查找匹配的订阅
    subscriptions = self._subscription_manager.get_matching_subscriptions(envelope.topic_id)
    
    # 为每个订阅创建处理任务
    tasks = []
    for subscription in subscriptions:
        try:
            agent_id = subscription.map_to_agent(envelope.topic_id)
            task = asyncio.create_task(
                self._deliver_message_to_agent(
                    agent_id=agent_id,
                    message=envelope.message,
                    sender=envelope.sender,
                    topic_id=envelope.topic_id,
                    is_rpc=False,
                    message_id=envelope.message_id,
                )
            )
            tasks.append(task)
        except Exception as e:
            logger.error(f"创建消息传递任务失败: {e}", exc_info=True)
    
    # 等待所有任务完成
    if tasks:
        await asyncio.gather(*tasks, return_exceptions=True)

async def _handle_send_message(self, envelope: SendMessageEnvelope) -> None:
    """处理点对点消息"""
    try:
        result = await self._deliver_message_to_agent(
            agent_id=envelope.recipient,
            message=envelope.message,
            sender=envelope.sender,
            topic_id=None,
            is_rpc=True,
            message_id=envelope.message_id,
        )
        # 设置返回结果
        envelope.future.set_result(result)
    except Exception as e:
        # 设置异常结果
        envelope.future.set_exception(e)

代理生命周期管理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
async def _get_or_create_agent(self, agent_id: AgentId) -> Agent:
    """获取或创建代理实例"""
    
    # 检查活跃代理缓存
    if agent_id in self._active_agents:
        return self._active_agents[agent_id]
    
    # 检查代理工厂
    if agent_id.type not in self._agent_factories:
        raise LookupError(f"未找到代理类型 '{agent_id.type}' 的工厂函数")
    
    # 创建代理实例
    factory = self._agent_factories[agent_id.type] 
    
    # 设置实例化上下文
    with AgentInstantiationContext.populate_context(agent_id, self):
        agent = await factory()
    
    # 绑定代理ID和运行时
    if hasattr(agent, 'bind_id_and_runtime'):
        await agent.bind_id_and_runtime(agent_id, self)
    
    # 缓存活跃代理
    self._active_agents[agent_id] = agent
    
    logger.info(f"创建代理实例: {agent_id}")
    return agent

async def _deliver_message_to_agent(
    self,
    agent_id: AgentId,
    message: Any,
    sender: AgentId | None,
    topic_id: TopicId | None,
    is_rpc: bool,
    message_id: str,
) -> Any:
    """将消息传递给指定代理"""
    
    # 获取或创建代理
    try:
        agent = await self._get_or_create_agent(agent_id)
    except LookupError as e:
        logger.error(f"无法找到代理 {agent_id}: {e}")
        raise UndeliverableException(f"代理不存在: {agent_id}") from e
    
    # 构建消息上下文
    context = MessageContext(
        sender=sender,
        topic_id=topic_id,
        is_rpc=is_rpc,
        cancellation_token=CancellationToken(),
        message_id=message_id,
    )
    
    # 调用代理处理消息
    try:
        result = await agent.on_message(message, context)
        logger.debug(f"代理 {agent_id} 成功处理消息: {message_id}")
        return result
    except CantHandleException as e:
        logger.warning(f"代理 {agent_id} 无法处理消息: {e}")
        raise
    except Exception as e:
        logger.error(f"代理 {agent_id} 处理消息时发生异常: {e}", exc_info=True)
        raise

运行控制机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class RunContext:
    """运行上下文 - 控制运行时的启动和停止"""
    
    def __init__(self, runtime: SingleThreadedAgentRuntime) -> None:
        self._runtime = runtime
        self._run_task = asyncio.create_task(self._run())
        self._stopped = asyncio.Event()
    
    async def _run(self) -> None:
        """主运行循环"""
        while True:
            if self._stopped.is_set():
                return
            await self._runtime._process_next()
    
    async def stop(self) -> None:
        """立即停止运行时"""
        self._stopped.set()
        self._runtime._message_queue.shutdown(immediate=True)
        await self._run_task
    
    async def stop_when_idle(self) -> None:
        """等待队列空闲后停止"""
        await self._runtime._message_queue.join()  # 等待队列为空
        self._stopped.set()
        self._runtime._message_queue.shutdown(immediate=True)
        await self._run_task
    
    async def stop_when(self, condition: Callable[[], bool], check_period: float = 1.0) -> None:
        """满足条件时停止"""
        async def check_condition() -> None:
            while not condition():
                await asyncio.sleep(check_period)
            await self.stop()
        
        await asyncio.create_task(check_condition())

# 运行时使用示例
def start(self) -> RunContext:
    """启动运行时"""
    if self._running:
        raise RuntimeError("运行时已经启动")
    
    self._running = True
    return RunContext(self)

2.6 基础代理实现

BaseAgent抽象基类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class BaseAgent(ABC, Agent):
    """基础代理抽象类 - 提供通用功能"""
    
    # 类级别的订阅和类型处理器存储
    internal_unbound_subscriptions_list: ClassVar[List[UnboundSubscription]] = []
    internal_extra_handles_types: ClassVar[List[Tuple[Type[Any], List[MessageSerializer[Any]]]]] = []
    
    def __init__(self, description: str) -> None:
        """初始化基础代理
        
        Args:
            description: 代理描述信息
        """
        # 在工厂调用上下文中自动绑定ID和运行时
        if AgentInstantiationContext.is_in_factory_call():
            self._runtime: AgentRuntime = AgentInstantiationContext.current_runtime()
            self._id = AgentInstantiationContext.current_agent_id()
        
        if not isinstance(description, str):
            raise ValueError("代理描述必须是字符串")
        
        self._description = description
    
    @property
    def metadata(self) -> AgentMetadata:
        """代理元数据"""
        assert self._id is not None
        return AgentMetadata(
            key=self._id.key,
            type=self._id.type, 
            description=self._description
        )
    
    @final
    async def on_message(self, message: Any, ctx: MessageContext) -> Any:
        """消息处理入口点 - 不可重写"""
        return await self.on_message_impl(message, ctx)
    
    @abstractmethod
    async def on_message_impl(self, message: Any, ctx: MessageContext) -> Any:
        """消息处理实现 - 子类必须重写"""
        ...
    
    # 便利方法
    async def send_message(
        self,
        message: Any,
        recipient: AgentId,
        *,
        cancellation_token: CancellationToken | None = None,
        message_id: str | None = None,
    ) -> Any:
        """发送消息到指定代理"""
        if cancellation_token is None:
            cancellation_token = CancellationToken()
        
        return await self._runtime.send_message(
            message,
            sender=self.id,
            recipient=recipient,
            cancellation_token=cancellation_token,
            message_id=message_id,
        )
    
    async def publish_message(
        self,
        message: Any,
        topic_id: TopicId,
        *,
        cancellation_token: CancellationToken | None = None,
    ) -> None:
        """发布消息到主题"""
        await self._runtime.publish_message(
            message, 
            topic_id, 
            sender=self.id, 
            cancellation_token=cancellation_token
        )

RoutedAgent路由代理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class RoutedAgent(BaseAgent):
    """路由代理 - 基于装饰器的消息路由"""
    
    def __init__(self, description: str) -> None:
        super().__init__(description)
        # 构建消息处理器映射
        self._message_handlers = self._build_message_handler_map()
    
    def _build_message_handler_map(self) -> Dict[Type[Any], List[MessageHandler[Any, Any, Any]]]:
        """构建消息类型到处理器的映射"""
        handlers_map: DefaultDict[Type[Any], List[MessageHandler[Any, Any, Any]]] = DefaultDict(list)
        
        # 遍历类的所有方法
        for name in sorted(dir(self.__class__)):
            method = getattr(self.__class__, name)
            
            # 检查是否为消息处理器
            if hasattr(method, 'is_message_handler') and method.is_message_handler:
                # 为每个目标类型注册处理器
                for target_type in method.target_types:
                    handlers_map[target_type].append(method)
        
        return dict(handlers_map)
    
    async def on_message_impl(self, message: Any, ctx: MessageContext) -> Any:
        """基于类型和匹配规则路由消息"""
        message_type = type(message)
        
        # 查找匹配的处理器
        handlers = self._message_handlers.get(message_type, [])
        
        if not handlers:
            raise CantHandleException(f"没有找到处理 {message_type} 类型消息的处理器")
        
        # 应用匹配规则查找第一个匹配的处理器
        for handler in handlers:
            try:
                if handler.router(message, ctx):
                    logger.debug(f"使用处理器 {handler.__name__} 处理消息类型 {message_type}")
                    return await handler(self, message, ctx)
            except Exception as e:
                logger.error(f"处理器 {handler.__name__} 路由检查失败: {e}")
                continue
        
        raise CantHandleException(f"没有匹配的处理器能够处理消息: {message}")

# 使用示例
class ExampleAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("示例代理")
    
    @message_handler
    async def handle_text(self, message: str, ctx: MessageContext) -> str:
        return f"处理文本: {message}"
    
    @event
    async def handle_notification(self, event: dict, ctx: MessageContext) -> None:
        print(f"收到通知: {event}")
    
    @rpc
    async def calculate(self, request: CalculationRequest, ctx: MessageContext) -> CalculationResult:
        result = request.a + request.b
        return CalculationResult(result=result)

3. 高级特性

3.1 高级对话控制机制

自定义回复策略

AutoGen 提供了强大的自定义回复策略机制,允许开发者根据业务需求定制智能体的响应行为:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class CustomReplyStrategy:
    """自定义回复策略 - 基于消息内容智能路由"""
    
    def __init__(self, priority_keywords: List[str], escalation_threshold: int = 3):
        self.priority_keywords = priority_keywords
        self.escalation_threshold = escalation_threshold
        self.message_count = 0
    
    async def __call__(
        self, 
        recipient: Agent, 
        messages: List[BaseChatMessage], 
        sender: Agent, 
        config: Dict[str, Any]
    ) -> Tuple[bool, Optional[str]]:
        """
        自定义回复策略实现
        
        Args:
            recipient: 接收消息的代理
            messages: 消息历史列表
            sender: 发送消息的代理  
            config: 配置参数
            
        Returns:
            Tuple[bool, Optional[str]]: (是否处理, 回复内容)
        """
        
        if not messages:
            return False, None
            
        last_message = messages[-1]
        content = last_message.content.lower()
        
        # 1. 优先级关键词检测
        for keyword in self.priority_keywords:
            if keyword in content:
                priority_response = await self._handle_priority_message(last_message, keyword)
                return True, priority_response
        
        # 2. 情感分析和适应性响应
        sentiment = await self._analyze_sentiment(content)
        if sentiment == "negative" and self.message_count > 2:
            escalation_response = await self._escalate_to_human(last_message)
            return True, escalation_response
        
        # 3. 上下文感知响应
        context = await self._extract_context(messages)
        if context.get("requires_expert"):
            expert_response = await self._route_to_expert(last_message, context)
            return True, expert_response
        
        self.message_count += 1
        return False, None  # 继续默认处理流程
    
    async def _handle_priority_message(self, message: BaseChatMessage, keyword: str) -> str:
        """处理优先级消息"""
        return f"检测到优先级关键词 '{keyword}',正在优先处理您的请求..."
    
    async def _analyze_sentiment(self, content: str) -> str:
        """情感分析"""
        # 简化的情感分析实现
        negative_indicators = ["生气", "愤怒", "不满", "糟糕", "失望"]
        if any(indicator in content for indicator in negative_indicators):
            return "negative"
        return "neutral"
    
    async def _escalate_to_human(self, message: BaseChatMessage) -> str:
        """升级到人工处理"""
        return "我注意到您可能遇到了一些困难,正在为您转接人工客服..."
    
    async def _extract_context(self, messages: List[BaseChatMessage]) -> Dict[str, Any]:
        """提取对话上下文"""
        context = {
            "topic": None,
            "requires_expert": False,
            "user_intent": None
        }
        
        # 分析最近几条消息确定主题
        recent_content = " ".join([msg.content for msg in messages[-3:]])
        
        if any(tech_word in recent_content for tech_word in ["技术", "代码", "编程", "bug"]):
            context["requires_expert"] = True
            context["topic"] = "technical"
        
        return context

# 使用示例
async def setup_custom_reply_strategy():
    """设置自定义回复策略示例"""
    
    # 创建智能体
    assistant = RoutedAgent("智能助手")
    
    # 创建自定义策略
    custom_strategy = CustomReplyStrategy(
        priority_keywords=["紧急", "重要", "立即", "urgent"],
        escalation_threshold=3
    )
    
    # 注册回复策略
    assistant.register_reply_handler(
        trigger_condition=lambda msg: True,  # 触发条件
        reply_function=custom_strategy,
        priority=0  # 最高优先级
    )
    
    return assistant

对话流程控制

AutoGen 提供了灵活的对话流程控制机制:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class ConversationFlowController:
    """对话流程控制器"""
    
    def __init__(self):
        self.conversation_state = {}
        self.flow_rules = {}
    
    async def interrupt_conversation(self, agent: Agent, reason: str) -> None:
        """中断当前对话"""
        await agent.pause_processing()
        
        # 保存当前状态
        current_state = await agent.save_conversation_state()
        self.conversation_state[agent.id] = {
            'state': current_state,
            'interrupt_reason': reason,
            'timestamp': datetime.utcnow()
        }
        
        # 发送中断通知
        await agent.send_system_message(f"对话已中断: {reason}")
    
    async def redirect_conversation(
        self, 
        from_agent: Agent, 
        to_agent: Agent, 
        message: str,
        preserve_context: bool = True
    ) -> None:
        """重定向对话到另一个代理"""
        
        if preserve_context:
            # 获取对话历史
            conversation_history = await from_agent.get_conversation_history()
            
            # 转移上下文到目标代理
            await to_agent.load_conversation_context(conversation_history)
        
        # 发送重定向消息
        await to_agent.send_message(HandoffMessage(
            target=to_agent.name,
            context=message,
            source=from_agent.name
        ))
        
        # 停止原代理的处理
        await from_agent.stop_reply_processing()
    
    async def resume_conversation(self, agent: Agent) -> None:
        """恢复中断的对话"""
        if agent.id in self.conversation_state:
            saved_state = self.conversation_state[agent.id]
            
            # 恢复对话状态
            await agent.load_conversation_state(saved_state['state'])
            
            # 恢复处理
            await agent.resume_processing()
            
            # 清理保存的状态
            del self.conversation_state[agent.id]

# 使用示例
async def flow_control_example():
    flow_controller = ConversationFlowController()
    
    # 设置流程规则
    flow_controller.add_flow_rule(
        condition=lambda msg: "技术问题" in msg.content,
        action=lambda agent: flow_controller.redirect_conversation(
            agent, tech_expert_agent, "需要技术专家协助"
        )
    )

3.2 组件配置系统

配置驱动的组件实例化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class ComponentModel(BaseModel):
    """组件模型 - 包含实例化组件所需的所有信息"""
    
    provider: str                    # 组件提供者,如 "my_module.MyClass"
    component_type: ComponentType | None = None  # 逻辑类型
    version: int | None = None       # 规范版本
    component_version: int | None = None  # 组件版本
    description: str | None = None   # 描述
    label: str | None = None         # 人类可读标签
    config: dict[str, Any]          # 配置参数

class Component(Generic[ConfigT]):
    """组件基类 - 支持配置驱动的实例化"""
    
    component_config_schema: ClassVar[Type[ConfigT]]
    component_type: ClassVar[ComponentType]
    
    @classmethod
    def _from_config(cls, config: ConfigT) -> Self:
        """从配置创建组件实例"""
        ...
    
    def _to_config(self) -> ConfigT:
        """将组件实例转换为配置"""
        ...

# 使用示例
class ChatAgentConfig(BaseModel):
    name: str
    model: str
    temperature: float = 0.7

class ChatAgent(Component[ChatAgentConfig]):
    component_config_schema = ChatAgentConfig
    component_type = "agent"
    
    def __init__(self, name: str, model: str, temperature: float = 0.7):
        self.name = name
        self.model = model  
        self.temperature = temperature
    
    @classmethod
    def _from_config(cls, config: ChatAgentConfig) -> Self:
        return cls(
            name=config.name,
            model=config.model,
            temperature=config.temperature
        )
    
    def _to_config(self) -> ChatAgentConfig:
        return ChatAgentConfig(
            name=self.name,
            model=self.model,
            temperature=self.temperature
        )

3.2 工具集成系统

FunctionTool实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class FunctionTool(BaseTool[BaseModel, BaseModel], Component[FunctionToolConfig]):
    """函数工具 - 将Python函数包装为代理工具"""
    
    def __init__(
        self,
        func: Callable[..., Any],
        name: str | None = None,
        description: str | None = None,
        global_imports: List[ImportConfig] | None = None,
    ) -> None:
        self._func = func
        self._name = name or func.__name__
        self._description = description or func.__doc__ or ""
        self._global_imports = global_imports or []
        
        # 分析函数签名
        self._signature = inspect.signature(func)
        self._has_cancellation_support = self._check_cancellation_support()
    
    def _check_cancellation_support(self) -> bool:
        """检查函数是否支持取消令牌"""
        return any(
            param.annotation == CancellationToken
            for param in self._signature.parameters.values()
        )
    
    async def run(self, args: BaseModel, cancellation_token: CancellationToken) -> BaseModel:
        """执行工具函数"""
        # 转换参数
        func_kwargs = {}
        for param_name, param in self._signature.parameters.items():
            if param.annotation == CancellationToken:
                func_kwargs[param_name] = cancellation_token
            elif hasattr(args, param_name):
                func_kwargs[param_name] = getattr(args, param_name)
        
        # 执行函数
        try:
            if asyncio.iscoroutinefunction(self._func):
                result = await self._func(**func_kwargs)
            else:
                result = self._func(**func_kwargs)
            
            # 包装结果
            if isinstance(result, BaseModel):
                return result
            else:
                # 创建动态结果模型
                ResultModel = create_model('ToolResult', result=(type(result), result))
                return ResultModel(result=result)
        
        except Exception as e:
            logger.error(f"工具执行失败: {e}", exc_info=True)
            raise
    
    @classmethod
    def _from_config(cls, config: FunctionToolConfig) -> Self:
        """⚠️ 安全警告:从配置加载函数工具会执行代码"""
        warnings.warn(
            "从配置加载FunctionTool会执行代码导入和函数代码。"
            "只从可信源加载配置以防止任意代码执行。",
            UserWarning,
            stacklevel=2,
        )
        
        exec_globals: dict[str, Any] = {}
        
        # 执行导入语句
        for import_stmt in config.global_imports:
            import_code = import_to_str(import_stmt)
            try:
                exec(import_code, exec_globals)
            except Exception as e:
                raise RuntimeError(f"导入失败 {import_code}: {str(e)}") from e
        
        # 执行函数代码
        try:
            exec(config.source_code, exec_globals)
            func_name = config.source_code.split("def ")[1].split("(")[0]
            func = exec_globals[func_name]
        except Exception as e:
            raise ValueError(f"无法编译和加载函数: {e}") from e
        
        return cls(
            func, 
            name=config.name, 
            description=config.description,
            global_imports=config.global_imports
        )

# 使用示例
def add_numbers(a: int, b: int) -> int:
    """将两个数字相加"""
    return a + b

async def fetch_weather(city: str, cancellation_token: CancellationToken) -> str:
    """获取天气信息"""
    # 模拟异步API调用
    await asyncio.sleep(1)
    return f"{city}的天气是晴天"

# 创建工具
add_tool = FunctionTool(add_numbers, description="数学加法工具")
weather_tool = FunctionTool(fetch_weather, description="天气查询工具")

3.3 干预处理系统

消息干预机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class InterventionHandler(Protocol):
    """干预处理器协议 - 在消息发送/发布前进行拦截"""
    
    async def on_send_message(
        self, 
        message: Any, 
        sender: AgentId | None, 
        recipient: AgentId
    ) -> Any | DropMessage:
        """拦截发送消息"""
        ...
    
    async def on_publish_message(
        self, 
        message: Any, 
        sender: AgentId | None, 
        topic_id: TopicId
    ) -> Any | DropMessage:
        """拦截发布消息"""  
        ...

class DropMessage:
    """特殊返回值 - 指示丢弃消息"""
    pass

class DefaultInterventionHandler:
    """默认干预处理器 - 记录但不修改消息"""
    
    async def on_send_message(self, message: Any, sender: AgentId | None, recipient: AgentId) -> Any:
        logger.debug(f"发送消息: {sender} -> {recipient}, 类型: {type(message)}")
        return message
    
    async def on_publish_message(self, message: Any, sender: AgentId | None, topic_id: TopicId) -> Any:
        logger.debug(f"发布消息: {sender} -> {topic_id}, 类型: {type(message)}")
        return message

# 使用示例
class MessageFilterHandler:
    """消息过滤干预处理器"""
    
    def __init__(self, blocked_words: List[str]):
        self.blocked_words = blocked_words
    
    async def on_send_message(self, message: Any, sender: AgentId | None, recipient: AgentId) -> Any | DropMessage:
        if isinstance(message, str):
            # 检查是否包含屏蔽词
            if any(word in message.lower() for word in self.blocked_words):
                logger.warning(f"消息包含屏蔽词,已丢弃: {message[:50]}...")
                return DropMessage()
            
            # 清理消息内容
            clean_message = message
            for word in self.blocked_words:
                clean_message = clean_message.replace(word, "***")
            
            return clean_message
        
        return message
    
    async def on_publish_message(self, message: Any, sender: AgentId | None, topic_id: TopicId) -> Any | DropMessage:
        return await self.on_send_message(message, sender, AgentId("dummy", "dummy"))

4. 性能优化与最佳实践

4.1 异步编程模式

正确的异步代理实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class OptimizedChatAgent(RoutedAgent):
    def __init__(self, model_client: Any):
        super().__init__("优化的聊天代理")
        self.model_client = model_client
        self._conversation_cache = {}  # 对话缓存
    
    @message_handler
    async def handle_chat_message(self, message: ChatMessage, ctx: MessageContext) -> ChatResponse:
        """优化的聊天消息处理"""
        
        # 异步获取上下文
        context_task = asyncio.create_task(self._get_conversation_context(message.user_id))
        
        # 异步预处理消息
        preprocessing_task = asyncio.create_task(self._preprocess_message(message))
        
        # 等待并行任务完成
        context, processed_message = await asyncio.gather(context_task, preprocessing_task)
        
        # 异步调用模型
        try:
            response = await self.model_client.generate_response(
                message=processed_message.content,
                context=context,
                timeout=30.0  # 设置超时
            )
            
            # 异步更新缓存
            asyncio.create_task(self._update_conversation_cache(message.user_id, response))
            
            return ChatResponse(content=response, user_id=message.user_id)
        
        except asyncio.TimeoutError:
            logger.error("模型调用超时")
            return ChatResponse(content="抱歉,处理超时,请稍后重试", user_id=message.user_id)
        except Exception as e:
            logger.error(f"处理聊天消息失败: {e}", exc_info=True)
            return ChatResponse(content="处理出错,请稍后重试", user_id=message.user_id)
    
    async def _get_conversation_context(self, user_id: str) -> dict:
        """异步获取对话上下文"""
        if user_id in self._conversation_cache:
            return self._conversation_cache[user_id]
        
        # 模拟从数据库异步加载
        await asyncio.sleep(0.1)
        context = {"history": [], "preferences": {}}
        self._conversation_cache[user_id] = context
        return context
    
    async def _preprocess_message(self, message: ChatMessage) -> ChatMessage:
        """异步预处理消息"""
        # 模拟异步预处理(如文本清理、实体提取等)
        await asyncio.sleep(0.05)
        return ChatMessage(
            content=message.content.strip(),
            user_id=message.user_id,
            timestamp=message.timestamp
        )
    
    async def _update_conversation_cache(self, user_id: str, response: str) -> None:
        """异步更新对话缓存"""
        if user_id in self._conversation_cache:
            self._conversation_cache[user_id]["history"].append(response)
            
            # 限制历史记录长度
            if len(self._conversation_cache[user_id]["history"]) > 10:
                self._conversation_cache[user_id]["history"] = \
                    self._conversation_cache[user_id]["history"][-10:]

4.2 内存管理优化

代理生命周期管理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class ManagedAgentRuntime(SingleThreadedAgentRuntime):
    """带生命周期管理的运行时"""
    
    def __init__(self, max_inactive_time: float = 300.0, **kwargs):
        super().__init__(**kwargs)
        self._max_inactive_time = max_inactive_time  # 5分钟
        self._agent_last_activity: Dict[AgentId, float] = {}
        self._cleanup_task: Task | None = None
    
    async def start_cleanup_task(self) -> None:
        """启动清理任务"""
        if self._cleanup_task is None:
            self._cleanup_task = asyncio.create_task(self._periodic_cleanup())
    
    async def stop_cleanup_task(self) -> None:
        """停止清理任务"""
        if self._cleanup_task:
            self._cleanup_task.cancel()
            try:
                await self._cleanup_task
            except asyncio.CancelledError:
                pass
            self._cleanup_task = None
    
    async def _periodic_cleanup(self) -> None:
        """定期清理不活跃的代理"""
        while True:
            try:
                await asyncio.sleep(60)  # 每分钟检查一次
                await self._cleanup_inactive_agents()
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"清理任务异常: {e}", exc_info=True)
    
    async def _cleanup_inactive_agents(self) -> None:
        """清理不活跃的代理"""
        current_time = asyncio.get_event_loop().time()
        inactive_agents = []
        
        for agent_id, last_activity in self._agent_last_activity.items():
            if current_time - last_activity > self._max_inactive_time:
                inactive_agents.append(agent_id)
        
        for agent_id in inactive_agents:
            if agent_id in self._active_agents:
                agent = self._active_agents[agent_id]
                
                # 调用代理清理方法
                if hasattr(agent, 'close'):
                    try:
                        await agent.close()
                    except Exception as e:
                        logger.error(f"代理 {agent_id} 清理失败: {e}")
                
                # 从活跃代理中移除
                del self._active_agents[agent_id]
                del self._agent_last_activity[agent_id]
                
                logger.info(f"清理不活跃代理: {agent_id}")
    
    async def _deliver_message_to_agent(self, agent_id: AgentId, **kwargs) -> Any:
        """重写消息传递,更新活动时间"""
        # 更新活动时间
        self._agent_last_activity[agent_id] = asyncio.get_event_loop().time()
        
        # 调用父类方法
        return await super()._deliver_message_to_agent(agent_id, **kwargs)

4.3 错误处理与容错

健壮的错误处理机制

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class ResilientAgent(RoutedAgent):
    """具有容错能力的代理"""
    
    def __init__(self, description: str, max_retries: int = 3):
        super().__init__(description)
        self.max_retries = max_retries
        self._circuit_breaker = CircuitBreaker(
            failure_threshold=5,
            timeout=60.0
        )
    
    @message_handler
    async def handle_with_retry(self, message: ProcessingRequest, ctx: MessageContext) -> ProcessingResponse:
        """带重试机制的消息处理"""
        
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                # 检查断路器状态
                if self._circuit_breaker.is_open:
                    raise ServiceUnavailableException("服务断路器已打开")
                
                # 执行处理逻辑
                result = await self._process_request_impl(message)
                
                # 成功时重置断路器
                self._circuit_breaker.record_success()
                
                return ProcessingResponse(
                    result=result,
                    attempt=attempt + 1,
                    success=True
                )
            
            except (ConnectionError, TimeoutError, ServiceUnavailableException) as e:
                last_exception = e
                self._circuit_breaker.record_failure()
                
                if attempt < self.max_retries:
                    # 指数退避
                    delay = min(2 ** attempt, 30)  # 最多等待30秒
                    logger.warning(f"处理失败,{delay}秒后重试 (尝试 {attempt + 1}/{self.max_retries + 1}): {e}")
                    await asyncio.sleep(delay)
                    continue
                else:
                    logger.error(f"处理失败,已达最大重试次数: {e}")
                    break
            
            except Exception as e:
                # 非可重试异常,直接失败
                logger.error(f"处理请求时发生不可重试异常: {e}", exc_info=True)
                return ProcessingResponse(
                    error=str(e),
                    attempt=attempt + 1,
                    success=False
                )
        
        # 所有重试都失败
        return ProcessingResponse(
            error=f"处理失败,已重试{self.max_retries}次: {str(last_exception)}",
            attempt=self.max_retries + 1,
            success=False
        )
    
    async def _process_request_impl(self, request: ProcessingRequest) -> Any:
        """实际的处理逻辑实现"""
        # 模拟可能失败的操作
        if random.random() < 0.3:  # 30%失败率用于演示
            raise ConnectionError("模拟连接失败")
        
        await asyncio.sleep(0.1)  # 模拟处理时间
        return f"处理结果: {request.data}"

class CircuitBreaker:
    """简单的断路器实现"""
    
    def __init__(self, failure_threshold: int, timeout: float):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    @property  
    def is_open(self) -> bool:
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "HALF_OPEN"
                return False
            return True
        return False
    
    def record_success(self) -> None:
        self.failure_count = 0
        self.state = "CLOSED"
    
    def record_failure(self) -> None:
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"

5. 总结与最佳实践

5.1 核心设计优势

  1. 类型安全:大量使用Python类型注解和Protocol,提供编译时类型检查
  2. 异步优先:全异步设计,支持高并发消息处理
  3. 可扩展性:插件化架构,支持自定义组件和工具
  4. 标准兼容:遵循CloudEvents等开放标准
  5. 开发友好:装饰器简化开发,丰富的调试和监控能力

5.2 最佳实践建议

代理设计

  • 保持代理职责单一,避免过度复杂的代理逻辑
  • 合理使用消息类型,提供清晰的接口定义
  • 实现适当的错误处理和超时机制
  • 考虑代理的生命周期管理

性能优化

  • 使用异步编程模式,避免阻塞调用
  • 合理设置消息队列大小和并发限制
  • 实现必要的缓存机制
  • 监控内存使用和代理数量

调试和监控

  • 使用结构化日志记录关键事件
  • 实现链路追踪和指标收集
  • 设置适当的告警阈值
  • 定期进行性能测试

通过深入理解AutoGen Python核心模块的设计和实现,开发者可以更好地构建高质量的多代理应用系统。


创建时间: 2025年09月13日

本文档基于autogen-core包源码分析整理