AutoGen-01-PythonCore-API

核心API列表

本模块提供的核心API按功能分类如下:

代理相关API

  • Agent:代理协议定义
  • BaseAgent:代理基础实现类
  • RoutedAgent:基于路由的代理实现类

运行时API

  • AgentRuntime:运行时协议定义
  • SingleThreadedAgentRuntime:单线程运行时实现

消息处理API

  • message_handler:通用消息处理装饰器
  • event:事件消息处理装饰器
  • rpc:RPC消息处理装饰器

基础设施API

  • AgentId:代理唯一标识
  • MessageContext:消息上下文
  • CancellationToken:取消令牌

API详细规格

Agent协议

基本信息

  • 名称Agent
  • 类型:Protocol协议定义
  • 作用:定义代理的核心接口规范

接口定义

@runtime_checkable
class Agent(Protocol):
    @property
    def metadata(self) -> AgentMetadata:
        """获取代理元数据,包含类型和描述信息"""
        
    @property  
    def id(self) -> AgentId:
        """获取代理唯一标识"""
        
    async def bind_id_and_runtime(self, id: AgentId, runtime: AgentRuntime) -> None:
        """绑定代理ID和运行时实例"""
        
    async def on_message(self, message: Any, ctx: MessageContext) -> Any:
        """处理接收到的消息,返回响应结果"""
        
    async def save_state(self) -> Mapping[str, Any]:
        """保存代理状态为JSON可序列化对象"""
        
    async def load_state(self, state: Mapping[str, Any]) -> None:
        """从保存的状态中恢复代理状态"""
        
    async def close(self) -> None:
        """清理代理资源,在运行时关闭时调用"""

方法说明

方法 参数 返回值 说明
metadata AgentMetadata 代理元数据,包含key、type、description
id AgentId 代理唯一标识,格式为type:key
bind_id_and_runtime id, runtime None 将代理实例绑定到特定运行时
on_message message, ctx Any 消息处理入口,业务逻辑核心
save_state Mapping[str, Any] 状态持久化,必须JSON兼容
load_state state None 状态恢复,与save_state配对
close None 资源清理,可选实现

实现示例

class SimpleAgent:
    def __init__(self, description: str):
        self._description = description
        self._state = {}
    
    @property
    def metadata(self) -> AgentMetadata:
        return AgentMetadata(
            key=self._id.key,
            type=self._id.type, 
            description=self._description
        )
    
    async def on_message(self, message: Any, ctx: MessageContext) -> Any:
        # 1. 消息类型检查
        if isinstance(message, TextMessage):
            # 2. 业务逻辑处理
            response = f"收到消息: {message.content}"
            # 3. 状态更新
            self._state['last_message'] = message.content
            return TextResponse(content=response)
        
        # 4. 未知消息类型处理
        raise CantHandleException(f"不支持的消息类型: {type(message)}")

BaseAgent基础实现

基本信息

  • 名称BaseAgent
  • 类型:抽象基类
  • 继承:实现Agent协议
  • 作用:提供代理的基础实现和注册机制

核心方法

class BaseAgent(ABC, Agent):
    def __init__(self, description: str) -> None:
        """初始化代理,设置描述信息"""
        
    async def bind_id_and_runtime(self, id: AgentId, runtime: AgentRuntime) -> None:
        """绑定代理ID和运行时,确保唯一性"""
        
    @abstractmethod
    async def on_message_impl(self, message: Any, ctx: MessageContext) -> Any:
        """抽象消息处理方法,子类必须实现"""
        
    async def send_message(self, message: Any, recipient: AgentId, 
                          cancellation_token: CancellationToken = None) -> Any:
        """发送消息到指定代理"""
        
    async def publish_message(self, message: Any, topic_id: TopicId,
                             cancellation_token: CancellationToken = None) -> None:
        """发布消息到主题"""
    
    @classmethod
    async def register(cls, runtime: AgentRuntime, type: str, 
                      factory: Callable[[], Self]) -> AgentType:
        """注册代理类型到运行时"""

注册机制详解

# 代理工厂注册示例
async def register_agent_example():
    runtime = SingleThreadedAgentRuntime()
    
    # 1. 定义代理工厂函数
    def create_echo_agent() -> EchoAgent:
        return EchoAgent("回声代理")
    
    # 2. 注册代理类型
    agent_type = await EchoAgent.register(
        runtime=runtime,
        type="echo_agent",  # 代理类型名称
        factory=create_echo_agent  # 工厂函数
    )
    
    # 3. 启动运行时
    await runtime.start()
    
    # 4. 创建代理实例
    agent_id = await runtime.get("echo_agent", key="default")
    
    # 5. 发送消息
    response = await runtime.send_message(
        message=TextMessage("你好"),
        recipient=agent_id
    )

订阅机制

# 使用装饰器定义订阅
@subscription_factory(TypeSubscription("user_message", "echo_agent"))
class EchoAgent(BaseAgent):
    async def on_message_impl(self, message: Any, ctx: MessageContext) -> Any:
        if isinstance(message, UserMessage):
            return EchoResponse(content=f"回声: {message.content}")
        return None

RoutedAgent路由代理

基本信息

  • 名称RoutedAgent
  • 类型:具体实现类
  • 继承:BaseAgent
  • 作用:基于装饰器的消息路由和处理

路由装饰器

class ChatAgent(RoutedAgent):
    def __init__(self):
        super().__init__("聊天代理")
    
    @event
    async def handle_user_message(self, message: UserMessage, ctx: MessageContext) -> None:
        """处理用户消息事件(无返回值)"""
        print(f"用户说: {message.content}")
        # 发布响应事件
        await self.publish_message(
            BotResponse(content=f"我收到了: {message.content}"),
            ctx.topic_id
        )
    
    @rpc
    async def get_status(self, message: StatusRequest, ctx: MessageContext) -> StatusResponse:
        """处理状态查询RPC(有返回值)"""
        return StatusResponse(
            status="在线",
            message_count=self._message_count
        )
    
    @message_handler(match=lambda msg, ctx: msg.priority == "high")
    async def handle_urgent_message(self, message: UrgentMessage, ctx: MessageContext) -> None:
        """使用条件匹配的消息处理器"""
        print(f"紧急消息: {message.content}")

路由机制说明

装饰器 适用场景 返回值 is_rpc标志
@event 事件通知、异步处理 None False
@rpc 同步调用、需要响应 Any True
@message_handler 通用处理、条件路由 Any 根据上下文

消息路由算法

async def on_message_impl(self, message: Any, ctx: MessageContext) -> Any | None:
    # 1. 按消息类型查找处理器
    message_type = type(message)
    handlers = self._handlers.get(message_type, [])
    
    # 2. 遍历处理器,应用路由条件
    for handler in handlers:
        if handler.router(message, ctx):  # 路由条件匹配
            return await handler(self, message, ctx)
    
    # 3. 没有匹配的处理器
    await self.on_unhandled_message(message, ctx)
    return None

SingleThreadedAgentRuntime运行时

基本信息

  • 名称SingleThreadedAgentRuntime
  • 类型:具体实现类
  • 实现:AgentRuntime协议
  • 作用:单线程异步运行时实现

核心API

class SingleThreadedAgentRuntime(AgentRuntime):
    async def start(self) -> None:
        """启动运行时,开始处理消息队列"""
        
    async def stop(self) -> None:
        """停止运行时,等待处理完成后退出"""
        
    async def send_message(self, message: Any, recipient: AgentId,
                          sender: AgentId = None, 
                          cancellation_token: CancellationToken = None) -> Any:
        """发送消息并等待响应"""
        
    async def publish_message(self, message: Any, topic_id: TopicId,
                             sender: AgentId = None,
                             cancellation_token: CancellationToken = None) -> None:
        """发布消息到订阅者"""
        
    async def register_factory(self, type: str, agent_factory: Callable[[], Agent]) -> AgentType:
        """注册代理工厂"""
        
    async def add_subscription(self, subscription: Subscription) -> None:
        """添加消息订阅"""

消息处理流程

# 发送消息的完整流程
async def send_message_flow():
    # 1. 创建消息信封
    envelope = SendMessageEnvelope(
        message=user_message,
        recipient=target_agent_id,
        sender=sender_agent_id,
        future=asyncio.Future(),
        cancellation_token=token
    )
    
    # 2. 投递到消息队列
    await self._message_queue.put(envelope)
    
    # 3. 异步处理消息
    await self._process_send(envelope)
    
    # 4. 获取目标代理实例
    recipient_agent = await self._get_agent(envelope.recipient)
    
    # 5. 构造消息上下文
    context = MessageContext(
        sender=envelope.sender,
        topic_id=None,  # RPC消息无主题
        is_rpc=True,
        cancellation_token=envelope.cancellation_token
    )
    
    # 6. 调用代理消息处理器
    response = await recipient_agent.on_message(envelope.message, context)
    
    # 7. 设置返回结果
    envelope.future.set_result(response)

发布-订阅流程

async def publish_message_flow():
    # 1. 查找订阅者
    recipients = await self._subscription_manager.get_subscribed_recipients(topic_id)
    
    # 2. 并发发送给所有订阅者
    tasks = []
    for agent_id in recipients:
        # 跳过发送者自己
        if sender and agent_id == sender:
            continue
            
        # 创建消息处理任务
        task = self._send_to_subscriber(agent_id, message, context)
        tasks.append(task)
    
    # 3. 等待所有处理完成
    await asyncio.gather(*tasks, return_exceptions=True)

消息装饰器API

@event装饰器

@event
async def handle_notification(self, message: NotificationMessage, ctx: MessageContext) -> None:
    """事件处理器特征:
    - 无返回值(返回类型必须是None)
    - 用于异步事件通知
    - is_rpc上下文标志为False
    """
    # 业务逻辑处理
    pass

@rpc装饰器

@rpc  
async def process_request(self, message: RequestMessage, ctx: MessageContext) -> ResponseMessage:
    """RPC处理器特征:
    - 必须有返回值
    - 用于同步请求-响应
    - is_rpc上下文标志为True
    """
    return ResponseMessage(result="处理完成")

@message_handler装饰器

@message_handler(match=lambda msg, ctx: msg.category == "important")
async def handle_important(self, message: CategoryMessage, ctx: MessageContext) -> Any:
    """通用处理器特征:
    - 支持条件路由匹配
    - 返回值可选
    - 根据上下文确定处理模式
    """
    if ctx.is_rpc:
        return ProcessResult(status="已处理")
    else:
        # 事件模式,无需返回
        await self.log_important_event(message)
        return None

异常处理与最佳实践

异常类型

  • CantHandleException:代理无法处理特定消息类型
  • UndeliverableException:消息无法投递到目标代理
  • LookupError:代理或订阅不存在
  • MessageDroppedException:消息被干预机制丢弃

最佳实践

代理实现

class RobustAgent(RoutedAgent):
    async def on_unhandled_message(self, message: Any, ctx: MessageContext) -> None:
        """重写未处理消息的默认行为"""
        logger.warning(f"未处理的消息类型: {type(message).__name__}")
        
        # 可选:发送错误响应
        if ctx.is_rpc:
            return ErrorResponse(message="不支持的消息类型")
    
    @event
    async def handle_with_error_recovery(self, message: DataMessage, ctx: MessageContext) -> None:
        try:
            await self.process_data(message.data)
        except Exception as e:
            logger.error(f"处理失败: {e}")
            # 发布错误事件
            await self.publish_message(
                ErrorEvent(agent_id=self.id, error=str(e)),
                ctx.topic_id
            )

运行时使用

async def runtime_best_practices():
    runtime = SingleThreadedAgentRuntime()
    
    try:
        # 1. 设置取消令牌
        cancel_token = CancellationToken()
        
        # 2. 带超时的消息发送
        response = await asyncio.wait_for(
            runtime.send_message(message, target_id, cancellation_token=cancel_token),
            timeout=30.0
        )
        
    except asyncio.TimeoutError:
        # 3. 主动取消操作
        cancel_token.cancel()
        logger.warning("消息发送超时")
        
    except CantHandleException:
        logger.error("目标代理无法处理该消息")
        
    finally:
        # 4. 确保运行时正常关闭
        await runtime.stop()