概述
本文档提供了AutoGen的完整API参考,涵盖Python和.NET两种实现的核心接口、参数说明、返回值和使用示例,为开发者提供详细的编程指南。
1. Python API参考
1.1 核心运行时API
SingleThreadedAgentRuntime
class SingleThreadedAgentRuntime(AgentRuntime):
"""单线程代理运行时"""
def __init__(
self,
intervention_handlers: List[InterventionHandler] | None = None,
tracer_provider: TracerProvider | None = None,
ignore_unhandled_exceptions: bool = True,
) -> None:
"""
初始化单线程代理运行时
Args:
intervention_handlers: 干预处理器列表
tracer_provider: 追踪提供者
ignore_unhandled_exceptions: 是否忽略未处理异常
Example:
```python
# 基础用法
runtime = SingleThreadedAgentRuntime()
# 带干预处理器
runtime = SingleThreadedAgentRuntime(
intervention_handlers=[LoggingHandler(), SecurityHandler()]
)
```
"""
async def send_message(
self,
message: Any,
recipient: AgentId,
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> Any:
"""
发送消息到指定代理
Args:
message: 要发送的消息对象
recipient: 接收方代理ID
sender: 发送方代理ID,如果为None表示来自外部
cancellation_token: 取消令牌
message_id: 消息唯一标识,如果为None会自动生成
Returns:
Any: 接收方代理的响应
Raises:
CantHandleException: 接收方无法处理该消息
UndeliverableException: 消息无法投递
Example:
```python
# 基础消息发送
response = await runtime.send_message(
message="Hello, world!",
recipient=AgentId("ChatAgent", "default")
)
# 带发送方的消息发送
response = await runtime.send_message(
message=ProcessingRequest(data="analyze"),
recipient=AgentId("AnalystAgent", "default"),
sender=AgentId("CoordinatorAgent", "main")
)
```
"""
async def publish_message(
self,
message: Any,
topic_id: TopicId,
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> None:
"""
发布消息到主题
Args:
message: 要发布的消息对象
topic_id: 目标主题ID
sender: 发送方代理ID
cancellation_token: 取消令牌
message_id: 消息唯一标识
Raises:
UndeliverableException: 消息无法投递
Example:
```python
# 发布事件消息
await runtime.publish_message(
message=UserLoginEvent(user_id="user123"),
topic_id=TopicId("user.login", "auth_service")
)
# 发布系统通知
await runtime.publish_message(
message=SystemNotification(message="系统维护完成"),
topic_id=TopicId("system.notification", "admin")
)
```
"""
async def register_factory(
self,
type: str | AgentType,
agent_factory: Callable[[], T | Awaitable[T]],
*,
expected_class: type[T] | None = None,
) -> AgentType:
"""
注册代理工厂函数
Args:
type: 代理类型名称
agent_factory: 代理工厂函数,返回代理实例
expected_class: 期望的代理类,用于运行时验证
Returns:
AgentType: 注册的代理类型
Example:
```python
# 注册简单代理工厂
await runtime.register_factory(
type="ChatAgent",
agent_factory=lambda: ChatAgent("聊天助手")
)
# 注册带验证的代理工厂
await runtime.register_factory(
type="AnalystAgent",
agent_factory=lambda: AnalystAgent("数据分析师"),
expected_class=AnalystAgent
)
```
"""
def start(self) -> RunContext:
"""
启动运行时
Returns:
RunContext: 运行上下文,用于控制运行时的生命周期
Example:
```python
# 启动运行时
runtime = SingleThreadedAgentRuntime()
run_context = runtime.start()
# 发送消息
await runtime.send_message(message, recipient)
# 停止运行时
await run_context.stop()
```
"""
BaseAgent和RoutedAgent
class BaseAgent(ABC, Agent):
"""基础代理抽象类"""
def __init__(self, description: str) -> None:
"""
初始化基础代理
Args:
description: 代理描述信息
Example:
```python
class MyAgent(BaseAgent):
def __init__(self):
super().__init__("我的自定义代理")
async def on_message_impl(self, message, ctx):
return f"处理消息: {message}"
```
"""
async def send_message(
self,
message: Any,
recipient: AgentId,
*,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> Any:
"""
发送消息到其他代理
Args:
message: 消息内容
recipient: 接收方代理ID
cancellation_token: 取消令牌
message_id: 消息ID
Returns:
Any: 接收方的响应
Example:
```python
class CoordinatorAgent(BaseAgent):
async def coordinate_task(self, task):
# 发送任务到工作代理
result = await self.send_message(
message=WorkTask(task_data=task),
recipient=AgentId("WorkerAgent", "default")
)
return result
```
"""
class RoutedAgent(BaseAgent):
"""路由代理 - 支持装饰器方式的消息处理"""
def __init__(self, description: str) -> None:
"""
初始化路由代理
Args:
description: 代理描述
Example:
```python
class ChatAgent(RoutedAgent):
def __init__(self):
super().__init__("智能聊天代理")
@message_handler
async def handle_text(self, message: str, ctx: MessageContext) -> str:
return f"回复: {message}"
@event
async def handle_notification(self, event: dict, ctx: MessageContext) -> None:
print(f"收到通知: {event}")
```
"""
# 装饰器API
def message_handler(
func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None,
*,
strict: bool = True,
match: None | Callable[[ReceivesT, MessageContext], bool] = None,
) -> MessageHandler[AgentT, ReceivesT, ProducesT]:
"""
消息处理器装饰器
Args:
func: 被装饰的处理函数
strict: 是否启用严格类型检查
match: 二次路由匹配函数
Example:
```python
class MyAgent(RoutedAgent):
@message_handler
async def handle_greeting(self, message: GreetingMessage, ctx: MessageContext) -> str:
return f"你好, {message.name}!"
@message_handler(match=lambda msg, ctx: msg.priority == "high")
async def handle_urgent(self, message: UrgentMessage, ctx: MessageContext) -> str:
return "正在紧急处理..."
```
"""
def event(
func: Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]]
) -> MessageHandler[AgentT, ReceivesT, None]:
"""
事件处理器装饰器 - 处理无需返回值的事件
Example:
```python
@event
async def handle_system_event(self, event: SystemEvent, ctx: MessageContext) -> None:
self.logger.info(f"系统事件: {event.type}")
```
"""
def rpc(
func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None,
*,
strict: bool = True,
match: None | Callable[[ReceivesT, MessageContext], bool] = None,
) -> MessageHandler[AgentT, ReceivesT, ProducesT]:
"""
RPC处理器装饰器 - 处理需要返回值的RPC调用
Example:
```python
@rpc
async def calculate(self, request: CalculationRequest, ctx: MessageContext) -> CalculationResponse:
result = request.a + request.b
return CalculationResponse(result=result)
```
"""
1.2 AgentChat API
AssistantAgent
class AssistantAgent(BaseChatAgent, Component[AssistantAgentConfig]):
"""助手代理 - 支持工具使用的智能助手"""
def __init__(
self,
name: str,
model_client: ChatCompletionClient,
tools: List[BaseTool[Any, Any] | Callable[..., Any]] | None = None,
workbench: Workbench | Sequence[Workbench] | None = None,
handoffs: List[HandoffBase | str] | None = None,
model_context: ChatCompletionContext | None = None,
description: str = "一个有用的助手代理",
system_message: str | None = None,
model_client_stream: bool = False,
reflect_on_tool_use: bool | None = None,
output_content_type: type[BaseModel] | None = None,
max_tool_iterations: int = 1,
tool_call_summary_format: str = "{result}",
**kwargs: Any,
) -> None:
"""
初始化助手代理
Args:
name: 代理名称,必须是有效的Python标识符
model_client: 模型客户端实例
tools: 工具列表,可以是BaseTool对象或函数
workbench: 工作台实例,与tools互斥
handoffs: 切换配置列表
model_context: 模型上下文,用于存储对话历史
description: 代理描述
system_message: 系统消息,会添加到每次模型推理的开头
model_client_stream: 是否启用流式模式
reflect_on_tool_use: 是否对工具使用结果进行反思
output_content_type: 结构化输出的类型
max_tool_iterations: 最大工具迭代次数
tool_call_summary_format: 工具调用摘要格式
Example:
```python
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
# 基础助手代理
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(
name="assistant",
model_client=model_client,
description="通用助手代理"
)
# 带工具的助手代理
def get_weather(city: str) -> str:
return f"{city}的天气是晴天"
agent_with_tools = AssistantAgent(
name="weather_assistant",
model_client=model_client,
tools=[get_weather],
description="天气查询助手"
)
# 带结构化输出的代理
class WeatherResponse(BaseModel):
city: str
temperature: float
description: str
structured_agent = AssistantAgent(
name="structured_assistant",
model_client=model_client,
output_content_type=WeatherResponse,
description="结构化响应助手"
)
```
"""
async def run(
self,
*,
task: str,
cancellation_token: CancellationToken | None = None,
) -> TaskResult:
"""
执行任务
Args:
task: 任务描述
cancellation_token: 取消令牌
Returns:
TaskResult: 任务执行结果
Example:
```python
# 执行简单任务
result = await agent.run(task="介绍一下Python编程语言")
# 打印结果
for message in result.messages:
print(f"{message.source}: {message.to_text()}")
```
"""
def run_stream(
self,
*,
task: str,
cancellation_token: CancellationToken | None = None,
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]:
"""
流式执行任务
Args:
task: 任务描述
cancellation_token: 取消令牌
Yields:
BaseAgentEvent | BaseChatMessage | TaskResult: 流式事件或最终结果
Example:
```python
from autogen_agentchat.ui import Console
# 流式执行并显示
await Console(agent.run_stream(task="写一篇关于AI的文章"))
# 手动处理流式结果
async for item in agent.run_stream(task="分析数据"):
if isinstance(item, TaskResult):
print("任务完成!")
break
else:
print(f"进度: {item}")
```
"""
Team API
class BaseGroupChat(Team, ABC, ComponentBase[BaseModel]):
"""群聊团队基类"""
def __init__(
self,
name: str,
description: str,
participants: List[ChatAgent | Team],
group_chat_manager_name: str,
group_chat_manager_class: type[SequentialRoutedAgent],
termination_condition: TerminationCondition | None = None,
max_turns: int | None = None,
runtime: AgentRuntime | None = None,
custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None,
emit_team_events: bool = False,
):
"""
初始化群聊团队
Args:
name: 团队名称
description: 团队描述
participants: 参与者列表,可以是ChatAgent或Team
group_chat_manager_name: 群聊管理器名称
group_chat_manager_class: 群聊管理器类
termination_condition: 终止条件
max_turns: 最大轮次
runtime: 运行时实例
custom_message_types: 自定义消息类型
emit_team_events: 是否发送团队事件
Example:
```python
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import MaxMessageTermination
# 创建参与者
writer = AssistantAgent("writer", model_client)
reviewer = AssistantAgent("reviewer", model_client)
# 创建团队
team = RoundRobinGroupChat(
name="writing_team",
description="协作写作团队",
participants=[writer, reviewer],
termination_condition=MaxMessageTermination(10)
)
# 运行团队任务
result = await team.run(task="写一篇技术文章")
```
"""
async def run(
self,
*,
task: str,
cancellation_token: CancellationToken | None = None,
) -> TaskResult:
"""
执行团队任务
Args:
task: 任务描述
cancellation_token: 取消令牌
Returns:
TaskResult: 任务执行结果
"""
def run_stream(
self,
*,
task: str,
cancellation_token: CancellationToken | None = None,
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]:
"""
流式执行团队任务
Args:
task: 任务描述
cancellation_token: 取消令牌
Yields:
BaseAgentEvent | BaseChatMessage | TaskResult: 流式事件或最终结果
"""
1.3 工具API
class FunctionTool(BaseTool[BaseModel, BaseModel], Component[FunctionToolConfig]):
"""函数工具 - 将Python函数包装为代理工具"""
def __init__(
self,
func: Callable[..., Any],
name: str | None = None,
description: str | None = None,
global_imports: List[ImportConfig] | None = None,
) -> None:
"""
初始化函数工具
Args:
func: 要包装的函数
name: 工具名称,默认使用函数名
description: 工具描述,默认使用函数文档字符串
global_imports: 全局导入配置
Example:
```python
# 简单函数工具
def add_numbers(a: int, b: int) -> int:
'''将两个数字相加'''
return a + b
add_tool = FunctionTool(add_numbers)
# 异步函数工具
async def fetch_data(url: str) -> dict:
'''从URL获取数据'''
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
fetch_tool = FunctionTool(fetch_data)
# 带取消支持的工具
async def long_running_task(
data: str,
cancellation_token: CancellationToken
) -> str:
'''长时间运行的任务'''
for i in range(100):
if cancellation_token.is_cancelled():
raise OperationCanceledException()
await asyncio.sleep(0.1)
return f"处理完成: {data}"
cancellable_tool = FunctionTool(long_running_task)
```
"""
2. .NET API参考
2.1 核心接口
IAgentRuntime
/// <summary>
/// 代理运行时接口
/// </summary>
public interface IAgentRuntime : ISaveState
{
/// <summary>
/// 发送消息到代理并获取响应
/// </summary>
/// <param name="message">要发送的消息</param>
/// <param name="recepient">接收方代理ID</param>
/// <param name="sender">发送方代理ID,为null表示来自外部</param>
/// <param name="messageId">消息唯一标识,为null时自动生成</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>接收方的响应</returns>
/// <exception cref="CantHandleException">接收方无法处理消息</exception>
/// <exception cref="UndeliverableException">消息无法投递</exception>
///
/// <example>
/// <code>
/// // 基础消息发送
/// var response = await runtime.SendMessageAsync(
/// message: "Hello, world!",
/// recepient: new AgentId("ChatAgent", "default")
/// );
///
/// // 带发送方的消息发送
/// var response = await runtime.SendMessageAsync(
/// message: new ProcessingRequest { Data = "analyze" },
/// recepient: new AgentId("AnalystAgent", "default"),
/// sender: new AgentId("CoordinatorAgent", "main")
/// );
/// </code>
/// </example>
ValueTask<object?> SendMessageAsync(
object message,
AgentId recepient,
AgentId? sender = null,
string? messageId = null,
CancellationToken cancellationToken = default);
/// <summary>
/// 发布消息到订阅给定主题的所有代理
/// </summary>
/// <param name="message">要发布的消息</param>
/// <param name="topic">目标主题</param>
/// <param name="sender">发送方代理ID</param>
/// <param name="messageId">消息唯一标识</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>发布操作的异步任务</returns>
///
/// <example>
/// <code>
/// // 发布事件消息
/// await runtime.PublishMessageAsync(
/// message: new UserLoginEvent { UserId = "user123" },
/// topic: new TopicId("user.login", "auth_service")
/// );
/// </code>
/// </example>
ValueTask PublishMessageAsync(
object message,
TopicId topic,
AgentId? sender = null,
string? messageId = null,
CancellationToken cancellationToken = default);
/// <summary>
/// 注册代理工厂函数
/// </summary>
/// <param name="type">代理类型</param>
/// <param name="factoryFunc">代理工厂函数</param>
/// <returns>注册的代理类型</returns>
///
/// <example>
/// <code>
/// // 注册代理工厂
/// await runtime.RegisterAgentFactoryAsync(
/// type: new AgentType("ChatAgent"),
/// factoryFunc: (agentId, runtime) =>
/// {
/// var agent = new ChatAgent(agentId, runtime);
/// return ValueTask.FromResult<IHostableAgent>(agent);
/// }
/// );
/// </code>
/// </example>
ValueTask<AgentType> RegisterAgentFactoryAsync(
AgentType type,
Func<AgentId, IAgentRuntime, ValueTask<IHostableAgent>> factoryFunc);
/// <summary>
/// 添加订阅
/// </summary>
/// <param name="subscription">订阅定义</param>
/// <returns>添加操作的异步任务</returns>
///
/// <example>
/// <code>
/// // 添加类型订阅
/// await runtime.AddSubscriptionAsync(
/// new TypeSubscription("chat.message", new AgentType("ChatAgent"))
/// );
///
/// // 添加前缀订阅
/// await runtime.AddSubscriptionAsync(
/// new TypePrefixSubscription("system.", new AgentType("SystemAgent"))
/// );
/// </code>
/// </example>
ValueTask AddSubscriptionAsync(ISubscriptionDefinition subscription);
}
BaseAgent
/// <summary>
/// 基础代理抽象类
/// </summary>
public abstract class BaseAgent : IHostableAgent, ISaveState
{
/// <summary>
/// 获取代理的唯一标识符
/// </summary>
public AgentId Id { get; private set; }
/// <summary>
/// 获取代理元数据
/// </summary>
public AgentMetadata Metadata { get; }
/// <summary>
/// 构造函数
/// </summary>
/// <param name="id">代理ID</param>
/// <param name="runtime">运行时实例</param>
/// <param name="description">代理描述</param>
/// <param name="logger">日志记录器</param>
///
/// <example>
/// <code>
/// public class GreetingAgent : BaseAgent, IHandle<GreetingMessage, GreetingResponse>
/// {
/// public GreetingAgent(AgentId id, IAgentRuntime runtime)
/// : base(id, runtime, "问候代理")
/// {
/// }
///
/// public ValueTask<GreetingResponse> HandleAsync(
/// GreetingMessage message,
/// MessageContext messageContext)
/// {
/// var response = new GreetingResponse
/// {
/// Message = $"你好, {message.Name}!"
/// };
/// return ValueTask.FromResult(response);
/// }
/// }
/// </code>
/// </example>
protected BaseAgent(
AgentId id,
IAgentRuntime runtime,
string description,
ILogger<BaseAgent>? logger = null);
/// <summary>
/// 发送消息到其他代理
/// </summary>
/// <param name="message">消息内容</param>
/// <param name="recepient">接收方代理ID</param>
/// <param name="messageId">消息ID</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>接收方的响应</returns>
///
/// <example>
/// <code>
/// // 在代理内部发送消息
/// public async ValueTask<ProcessingResponse> ProcessDataAsync(ProcessingRequest request)
/// {
/// var result = await SendMessageAsync(
/// message: request,
/// recepient: new AgentId("DataProcessor", "default")
/// );
///
/// return (ProcessingResponse)result;
/// }
/// </code>
/// </example>
public ValueTask<object?> SendMessageAsync(
object message,
AgentId recepient,
string? messageId = null,
CancellationToken cancellationToken = default);
/// <summary>
/// 发布消息到主题
/// </summary>
/// <param name="message">消息内容</param>
/// <param name="topic">目标主题</param>
/// <param name="messageId">消息ID</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>发布操作的异步任务</returns>
public ValueTask PublishMessageAsync(
object message,
TopicId topic,
string? messageId = null,
CancellationToken cancellationToken = default);
}
2.2 消息处理接口
/// <summary>
/// 消息处理接口 - 处理输入类型T,无返回值
/// </summary>
/// <typeparam name="T">输入消息类型</typeparam>
///
/// <example>
/// <code>
/// public class NotificationAgent : BaseAgent, IHandle<NotificationMessage>
/// {
/// public async ValueTask HandleAsync(NotificationMessage message, MessageContext messageContext)
/// {
/// // 处理通知消息
/// await SendNotificationAsync(message.Content);
/// }
/// }
/// </code>
/// </example>
public interface IHandle<in T>
{
/// <summary>
/// 异步处理指定类型的消息
/// </summary>
/// <param name="item">要处理的消息</param>
/// <param name="messageContext">消息上下文</param>
/// <returns>处理操作的异步任务</returns>
ValueTask HandleAsync(T item, MessageContext messageContext);
}
/// <summary>
/// 消息处理接口 - 处理输入类型InT,返回类型OutT
/// </summary>
/// <typeparam name="InT">输入消息类型</typeparam>
/// <typeparam name="OutT">输出结果类型</typeparam>
///
/// <example>
/// <code>
/// public class CalculatorAgent : BaseAgent, IHandle<CalculationRequest, CalculationResponse>
/// {
/// public ValueTask<CalculationResponse> HandleAsync(
/// CalculationRequest request,
/// MessageContext messageContext)
/// {
/// var result = request.A + request.B;
/// return ValueTask.FromResult(new CalculationResponse { Result = result });
/// }
/// }
/// </code>
/// </example>
public interface IHandle<in InT, OutT>
{
/// <summary>
/// 异步处理指定类型的消息并返回结果
/// </summary>
/// <param name="item">要处理的消息</param>
/// <param name="messageContext">消息上下文</param>
/// <returns>处理结果的异步任务</returns>
ValueTask<OutT> HandleAsync(InT item, MessageContext messageContext);
}
2.3 订阅API
/// <summary>
/// 类型订阅 - 基于精确类型匹配
/// </summary>
///
/// <example>
/// <code>
/// // 创建类型订阅
/// var subscription = new TypeSubscription(
/// topicType: "chat.message",
/// agentType: new AgentType("ChatAgent")
/// );
///
/// await runtime.AddSubscriptionAsync(subscription);
/// </code>
/// </example>
public class TypeSubscription : ISubscriptionDefinition
{
/// <summary>
/// 构造函数
/// </summary>
/// <param name="topicType">要匹配的精确主题类型</param>
/// <param name="agentType">处理此订阅的代理类型</param>
/// <param name="id">订阅唯一标识,为null时自动生成</param>
public TypeSubscription(string topicType, AgentType agentType, string? id = null);
/// <summary>
/// 检查主题是否匹配
/// </summary>
/// <param name="topic">要检查的主题</param>
/// <returns>是否匹配</returns>
public bool Matches(TopicId topic);
/// <summary>
/// 将主题映射到代理ID
/// </summary>
/// <param name="topic">主题ID</param>
/// <returns>目标代理ID</returns>
public AgentId MapToAgent(TopicId topic);
}
/// <summary>
/// 前缀订阅 - 基于主题类型前缀匹配
/// </summary>
///
/// <example>
/// <code>
/// // 创建前缀订阅,匹配所有以"system."开头的主题
/// var prefixSubscription = new TypePrefixSubscription(
/// topicTypePrefix: "system.",
/// agentType: new AgentType("SystemAgent")
/// );
///
/// await runtime.AddSubscriptionAsync(prefixSubscription);
/// </code>
/// </example>
public class TypePrefixSubscription : ISubscriptionDefinition
{
/// <summary>
/// 构造函数
/// </summary>
/// <param name="topicTypePrefix">主题类型前缀</param>
/// <param name="agentType">处理此订阅的代理类型</param>
/// <param name="id">订阅唯一标识</param>
public TypePrefixSubscription(string topicTypePrefix, AgentType agentType, string? id = null);
}
3. 配置API
3.1 组件配置
# Python组件配置
class ComponentModel(BaseModel):
"""组件配置模型"""
provider: str
"""组件提供者类路径,如 'my_module.MyClass'"""
component_type: ComponentType | None = None
"""组件逻辑类型"""
config: dict[str, Any]
"""组件配置参数"""
# Example usage:
"""
# 配置模型客户端
model_config = ComponentModel(
provider="autogen_ext.models.openai.OpenAIChatCompletionClient",
component_type="model",
config={
"model": "gpt-4o",
"api_key": "your_api_key",
"temperature": 0.7
}
)
# 配置工具
tool_config = ComponentModel(
provider="autogen_core.tools.FunctionTool",
component_type="tool",
config={
"function": "my_module.my_function",
"description": "我的自定义工具"
}
)
# 配置代理
agent_config = ComponentModel(
provider="autogen_agentchat.agents.AssistantAgent",
component_type="agent",
config={
"name": "assistant",
"model_client": model_config,
"tools": [tool_config],
"description": "配置化的助手代理"
}
)
"""
// .NET配置API示例
public class AutoGenConfiguration
{
/// <summary>
/// 配置AutoGen服务
/// </summary>
/// <param name="services">服务集合</param>
/// <param name="configuration">配置对象</param>
/// <returns>配置后的服务集合</returns>
///
/// <example>
/// <code>
/// public void ConfigureServices(IServiceCollection services, IConfiguration configuration)
/// {
/// // 基础AutoGen服务配置
/// services.ConfigureAutoGen(configuration)
/// .AddInProcessRuntime()
/// .AddGrpcRuntime()
/// .AddMonitoring()
/// .AddSecurity();
///
/// // 注册自定义代理
/// services.AddTransient<ChatAgent>();
/// services.AddSingleton<AgentFactory>();
/// }
/// </code>
/// </example>
public static IServiceCollection ConfigureAutoGen(
this IServiceCollection services,
IConfiguration configuration)
{
// 配置核心服务
services.Configure<AutoGenOptions>(configuration.GetSection("AutoGen"));
// 注册核心服务
services.AddSingleton<IAgentRuntime, InProcessRuntime>();
services.AddSingleton<IServiceRegistry, ServiceRegistry>();
services.AddSingleton<ILoadBalancer, LoadBalancer>();
return services;
}
}
4. 高级API用法
4.1 自定义组件开发
# 自定义代理组件
class CustomAgent(Component[CustomAgentConfig]):
"""自定义代理示例"""
component_config_schema = CustomAgentConfig
component_type = "agent"
def __init__(self, name: str, special_capability: str):
self.name = name
self.special_capability = special_capability
@classmethod
def _from_config(cls, config: CustomAgentConfig) -> Self:
"""从配置创建实例"""
return cls(
name=config.name,
special_capability=config.special_capability
)
def _to_config(self) -> CustomAgentConfig:
"""转换为配置"""
return CustomAgentConfig(
name=self.name,
special_capability=self.special_capability
)
# 自定义工具组件
class CustomTool(BaseTool[CustomToolInput, CustomToolOutput], Component[CustomToolConfig]):
"""自定义工具示例"""
def __init__(self, api_endpoint: str, api_key: str):
self.api_endpoint = api_endpoint
self.api_key = api_key
async def run(
self,
args: CustomToolInput,
cancellation_token: CancellationToken
) -> CustomToolOutput:
"""执行工具逻辑"""
# 调用外部API
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {self.api_key}"}
async with session.post(
self.api_endpoint,
json=args.model_dump(),
headers=headers
) as response:
result_data = await response.json()
return CustomToolOutput(**result_data)
4.2 中间件开发
class CustomMiddleware:
"""自定义中间件示例"""
def __init__(self, config: MiddlewareConfig):
self.config = config
self.request_count = 0
async def __call__(
self,
message: Any,
context: MessageContext,
next_handler: Callable
) -> Any:
"""中间件处理逻辑"""
self.request_count += 1
start_time = time.time()
try:
# 预处理
processed_message = await self._preprocess_message(message, context)
# 调用下一个处理器
result = await next_handler(processed_message, context)
# 后处理
final_result = await self._postprocess_result(result, context)
# 记录成功指标
processing_time = time.time() - start_time
await self._record_success_metrics(processing_time)
return final_result
except Exception as e:
# 错误处理
await self._handle_error(e, message, context)
raise
async def _preprocess_message(self, message: Any, context: MessageContext) -> Any:
"""消息预处理"""
# 示例:添加时间戳
if hasattr(message, '__dict__'):
message.processed_at = datetime.utcnow()
return message
async def _postprocess_result(self, result: Any, context: MessageContext) -> Any:
"""结果后处理"""
# 示例:添加处理统计
if hasattr(result, '__dict__'):
result.processing_stats = {
'request_count': self.request_count,
'processed_by': self.__class__.__name__
}
return result
# 中间件注册
async def register_middleware_example():
runtime = SingleThreadedAgentRuntime()
# 添加自定义中间件
middleware = CustomMiddleware(MiddlewareConfig(enabled=True))
runtime.add_middleware(middleware)
return runtime
5. 错误处理API
5.1 异常类型
# AutoGen异常层次结构
class AutoGenException(Exception):
"""AutoGen基础异常类"""
pass
class CantHandleException(AutoGenException):
"""代理无法处理消息异常"""
def __init__(self, message: str, agent_id: Optional[AgentId] = None):
super().__init__(message)
self.agent_id = agent_id
class UndeliverableException(AutoGenException):
"""消息无法投递异常"""
def __init__(self, message: str, target_agent: Optional[AgentId] = None):
super().__init__(message)
self.target_agent = target_agent
class MessageDroppedException(AutoGenException):
"""消息被丢弃异常"""
def __init__(self, reason: str):
super().__init__(f"消息被丢弃: {reason}")
self.reason = reason
# 使用示例
async def error_handling_example():
"""错误处理示例"""
try:
result = await runtime.send_message(message, recipient)
except CantHandleException as e:
logger.warning(f"代理 {e.agent_id} 无法处理消息: {e}")
# 尝试其他代理或降级处理
result = await fallback_processing(message)
except UndeliverableException as e:
logger.error(f"消息无法投递到 {e.target_agent}: {e}")
# 重试或报告错误
await retry_or_report_error(message, e.target_agent)
except MessageDroppedException as e:
logger.info(f"消息被干预处理器丢弃: {e.reason}")
# 记录丢弃事件
await record_message_drop_event(message, e.reason)
5.2 .NET异常处理
// .NET异常类型
public class AutoGenException : Exception
{
public AutoGenException(string message) : base(message) { }
public AutoGenException(string message, Exception innerException) : base(message, innerException) { }
}
public class CantHandleException : AutoGenException
{
public AgentId? AgentId { get; }
/// <example>
/// <code>
/// try
/// {
/// var result = await runtime.SendMessageAsync(message, recipient);
/// }
/// catch (CantHandleException ex)
/// {
/// _logger.LogWarning("代理 {AgentId} 无法处理消息: {Message}", ex.AgentId, ex.Message);
/// // 实施降级处理
/// result = await FallbackProcessing(message);
/// }
/// </code>
/// </example>
public CantHandleException(string message, AgentId? agentId = null) : base(message)
{
AgentId = agentId;
}
}
public class UndeliverableException : AutoGenException
{
public AgentId? TargetAgent { get; }
public UndeliverableException(string message, AgentId? targetAgent = null) : base(message)
{
TargetAgent = targetAgent;
}
}
6. 扩展和集成API
6.1 模型客户端集成
# 自定义模型客户端
class CustomModelClient(ChatCompletionClient):
"""自定义模型客户端示例"""
def __init__(self, api_endpoint: str, api_key: str, model_name: str):
self.api_endpoint = api_endpoint
self.api_key = api_key
self.model_name = model_name
async def create(
self,
messages: Sequence[LLMMessage],
tools: Sequence[Tool | Callable[..., Any]] | None = None,
json_output_schema: BaseModel | None = None,
extra_create_args: Mapping[str, Any] | None = None,
cancellation_token: CancellationToken | None = None,
) -> CreateResult:
"""创建聊天完成"""
# 构造API请求
request_data = {
"model": self.model_name,
"messages": [self._convert_message(msg) for msg in messages],
"tools": [self._convert_tool(tool) for tool in (tools or [])],
"stream": False
}
# 调用自定义API
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {self.api_key}"}
async with session.post(
f"{self.api_endpoint}/chat/completions",
json=request_data,
headers=headers
) as response:
response_data = await response.json()
# 转换响应格式
return CreateResult(
content=response_data["choices"][0]["message"]["content"],
usage=RequestUsage(
prompt_tokens=response_data["usage"]["prompt_tokens"],
completion_tokens=response_data["usage"]["completion_tokens"]
)
)
# 使用自定义模型客户端
async def custom_model_example():
custom_client = CustomModelClient(
api_endpoint="https://api.example.com/v1",
api_key="your_api_key",
model_name="custom-model-v1"
)
agent = AssistantAgent(
name="custom_assistant",
model_client=custom_client,
description="使用自定义模型的助手"
)
result = await agent.run(task="使用自定义模型回答问题")
return result
6.2 存储后端集成
class CustomStateStorage(StateStorage):
"""自定义状态存储后端"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.connection_pool = None
async def save_state_snapshot(self, snapshot: StateSnapshot) -> bool:
"""保存状态快照"""
async with self._get_connection() as conn:
try:
await conn.execute("""
INSERT INTO agent_states (agent_id, state_data, version, timestamp, checksum)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (agent_id) DO UPDATE SET
state_data = EXCLUDED.state_data,
version = EXCLUDED.version,
timestamp = EXCLUDED.timestamp,
checksum = EXCLUDED.checksum
""",
str(snapshot.agent_id),
json.dumps(snapshot.state_data),
snapshot.version,
snapshot.timestamp,
snapshot.checksum
)
return True
except Exception as e:
logger.error(f"保存状态快照失败: {e}")
return False
async def load_state_snapshot(self, agent_id: AgentId, version: Optional[int] = None) -> Optional[StateSnapshot]:
"""加载状态快照"""
async with self._get_connection() as conn:
if version:
row = await conn.fetchrow("""
SELECT * FROM agent_states
WHERE agent_id = $1 AND version = $2
""", str(agent_id), version)
else:
row = await conn.fetchrow("""
SELECT * FROM agent_states
WHERE agent_id = $1
ORDER BY version DESC LIMIT 1
""", str(agent_id))
if row:
return StateSnapshot(
agent_id=AgentId.from_str(row['agent_id']),
state_data=json.loads(row['state_data']),
version=row['version'],
timestamp=row['timestamp'],
checksum=row['checksum']
)
return None
7. 使用模式和最佳实践
7.1 代理注册模式
# 模式1: 类方法注册
class ChatAgent(RoutedAgent):
@classmethod
async def create_and_register(cls, runtime: AgentRuntime, name: str) -> AgentType:
"""创建并注册代理的便利方法"""
agent_type = await cls.register(
runtime=runtime,
type=name,
factory=lambda: cls(f"{name}代理")
)
return agent_type
# 模式2: 工厂类注册
class AgentFactory:
"""代理工厂类"""
@staticmethod
async def register_all_agents(runtime: AgentRuntime) -> Dict[str, AgentType]:
"""批量注册所有代理类型"""
agents_config = {
"ChatAgent": lambda: ChatAgent("聊天助手"),
"AnalystAgent": lambda: AnalystAgent("数据分析师"),
"CoderAgent": lambda: CoderAgent("代码助手")
}
registered_types = {}
for agent_name, factory_func in agents_config.items():
agent_type = await runtime.register_factory(agent_name, factory_func)
registered_types[agent_name] = agent_type
return registered_types
# 模式3: 装饰器注册
def auto_register(agent_type: str):
"""自动注册装饰器"""
def decorator(cls):
cls._auto_register_type = agent_type
return cls
return decorator
@auto_register("SmartAgent")
class SmartAgent(RoutedAgent):
def __init__(self):
super().__init__("智能代理")
# 自动发现和注册
async def auto_register_agents(runtime: AgentRuntime):
"""自动发现和注册带装饰器的代理"""
import inspect
import sys
for name, obj in inspect.getmembers(sys.modules[__name__]):
if (inspect.isclass(obj) and
issubclass(obj, RoutedAgent) and
hasattr(obj, '_auto_register_type')):
await runtime.register_factory(
obj._auto_register_type,
lambda cls=obj: cls()
)
7.2 消息模式
# 请求-响应模式
@dataclass
class ProcessingRequest:
data: str
options: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ProcessingResponse:
result: str
metadata: Dict[str, Any] = field(default_factory=dict)
class ProcessorAgent(RoutedAgent):
@rpc
async def process(self, request: ProcessingRequest, ctx: MessageContext) -> ProcessingResponse:
# 处理逻辑
processed_data = await self._process_data(request.data, request.options)
return ProcessingResponse(
result=processed_data,
metadata={"processing_time": time.time()}
)
# 事件通知模式
@dataclass
class UserActionEvent:
user_id: str
action_type: str
timestamp: datetime = field(default_factory=datetime.utcnow)
class UserActionListener(RoutedAgent):
@event
async def on_user_action(self, event: UserActionEvent, ctx: MessageContext) -> None:
# 处理用户行为事件
await self._log_user_action(event)
await self._update_user_analytics(event)
# 发布事件
await runtime.publish_message(
UserActionEvent(user_id="user123", action_type="login"),
TopicId("user.action", "user_service")
)
# 流式处理模式
class StreamingAgent(RoutedAgent):
@message_handler
async def handle_streaming_request(
self,
request: StreamingRequest,
ctx: MessageContext
) -> AsyncGenerator[StreamingChunk, None]:
"""流式处理示例"""
for i in range(request.chunk_count):
chunk = StreamingChunk(
id=i,
data=f"处理块 {i}",
is_final=(i == request.chunk_count - 1)
)
yield chunk
await asyncio.sleep(0.1) # 模拟处理时间
8. 配置参考
8.1 环境配置
# config/autogen.yaml - 完整配置示例
autogen:
# 运行时配置
runtime:
type: "SingleThreadedAgentRuntime"
max_concurrent_agents: 100
message_queue_size: 10000
ignore_unhandled_exceptions: true
# 代理配置
agents:
chat_agent:
provider: "autogen_agentchat.agents.AssistantAgent"
config:
name: "assistant"
description: "通用助手代理"
model_client:
provider: "autogen_ext.models.openai.OpenAIChatCompletionClient"
config:
model: "gpt-4o"
api_key: "${OPENAI_API_KEY}"
temperature: 0.7
analyst_agent:
provider: "custom_agents.AnalystAgent"
config:
name: "analyst"
database_url: "${DATABASE_URL}"
# 工具配置
tools:
weather_tool:
provider: "autogen_core.tools.FunctionTool"
config:
function: "weather_api.get_weather"
description: "获取天气信息"
# 监控配置
monitoring:
enabled: true
metrics_interval: 30
health_check_interval: 10
# 日志配置
logging:
level: "INFO"
format: "structured"
output:
- "console"
- "file:/var/log/autogen/autogen.log"
8.2 程序化配置
# 程序化配置示例
from autogen_core import ComponentLoader
async def setup_autogen_from_config(config_path: str) -> AgentRuntime:
"""从配置文件设置AutoGen"""
# 1. 加载配置
loader = ComponentLoader()
config = await loader.load_config(config_path)
# 2. 创建运行时
runtime_config = config['autogen']['runtime']
runtime = await loader.create_component(runtime_config)
# 3. 注册代理
for agent_name, agent_config in config['autogen']['agents'].items():
agent_factory = await loader.create_factory(agent_config)
await runtime.register_factory(agent_name, agent_factory)
# 4. 设置工具
tools = {}
for tool_name, tool_config in config['autogen']['tools'].items():
tool = await loader.create_component(tool_config)
tools[tool_name] = tool
# 5. 配置监控
if config['autogen']['monitoring']['enabled']:
monitoring_service = await setup_monitoring(config['autogen']['monitoring'])
runtime.add_service(monitoring_service)
return runtime
9. 测试API
9.1 单元测试
import pytest
from autogen_core import SingleThreadedAgentRuntime, AgentId
from autogen_core.testing import MockAgent, TestContext
@pytest.fixture
async def test_runtime():
"""测试运行时fixture"""
runtime = SingleThreadedAgentRuntime()
yield runtime
await runtime.stop()
@pytest.fixture
async def test_agent():
"""测试代理fixture"""
return MockAgent("test_agent", responses={"hello": "world"})
class TestAgentCommunication:
"""代理通信测试"""
async def test_basic_message_sending(self, test_runtime, test_agent):
"""测试基础消息发送"""
# 注册测试代理
await test_agent.register(test_runtime, "TestAgent", lambda: test_agent)
# 发送消息
response = await test_runtime.send_message(
message="hello",
recipient=AgentId("TestAgent", "default")
)
# 验证响应
assert response == "world"
async def test_message_publishing(self, test_runtime):
"""测试消息发布"""
# 创建订阅计数器
subscription_counter = SubscriptionCounter()
await subscription_counter.register(test_runtime, "Counter", lambda: subscription_counter)
# 添加订阅
await test_runtime.add_subscription(
TypeSubscription("test.event", "Counter")
)
# 发布消息
await test_runtime.publish_message(
message={"event": "test"},
topic_id=TopicId("test.event", "test_source")
)
# 验证收到消息
assert subscription_counter.count == 1
async def test_agent_lifecycle(self, test_runtime):
"""测试代理生命周期"""
# 注册代理
await test_runtime.register_factory(
"LifecycleAgent",
lambda: LifecycleTestAgent()
)
# 触发代理创建
agent_id = await test_runtime.get(AgentType("LifecycleAgent"))
# 验证代理已创建
assert agent_id in test_runtime._active_agents
# 测试状态保存
await test_runtime.agent_save_state(agent_id)
# 测试状态加载
state = await test_runtime.agent_load_state(agent_id, {})
10. 总结
10.1 API设计原则
- 一致性: Python和.NET API保持一致的设计模式
- 类型安全: 大量使用类型注解和泛型
- 异步优先: 所有I/O操作都是异步的
- 可扩展性: 支持自定义组件和中间件
- 错误处理: 明确的异常层次和错误处理机制
10.2 使用建议
- 开发阶段: 使用SingleThreadedAgentRuntime进行快速原型开发
- 测试阶段: 利用测试工具和Mock对象进行全面测试
- 生产阶段: 使用分布式运行时和完整的监控配置
- 调试阶段: 启用详细日志和追踪功能
10.3 性能最佳实践
- 合理配置: 根据实际负载配置适当的并发参数
- 资源管理: 正确使用取消令牌和生命周期管理
- 缓存策略: 合理使用缓存减少重复计算
- 批处理: 在高吞吐场景下使用消息批处理
10.4 关键函数:核心代码要点、调用链与时序图
- Python | SingleThreadedAgentRuntime.send_message / publish_message
class SingleThreadedAgentRuntime(AgentRuntime):
async def send_message(self, message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) -> Any:
# 路由到目标代理 → 激活/获取 → on_message → 返回结果(异常:CantHandle/Undeliverable)
...
async def publish_message(self, message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) -> None:
# 查询订阅者 → 逐一投递(并发)
...
调用链(典型):
- Client →
send_message
→ 路由/投递 → 目标代理on_message
→ 返回 - Client →
publish_message
→ 匹配订阅者 → 分发
时序图:
sequenceDiagram
participant C as Client
participant RT as Runtime
participant AG as Agent
C->>RT: send_message(msg, recipient)
RT->>AG: on_message(msg)
AG-->>RT: result
RT-->>C: result
- .NET | HandlerInvoker(统一 ValueTask 调用封装)
public sealed class HandlerInvoker
{
// 见前文:TypeEraseAwait<T> 与 ReturnType 分支,实现 ValueTask 与 ValueTask<T> 的统一封装
}
10.5 关键结构体与类:结构图与继承关系
classDiagram
class AgentRuntime
class SingleThreadedAgentRuntime
class BaseAgent
class RoutedAgent
class MessageHandler
AgentRuntime <|-- SingleThreadedAgentRuntime
BaseAgent <|-- RoutedAgent
RoutedAgent --> MessageHandler : uses