1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
| class BaseAgent(ABC, Agent):
"""基础代理抽象类"""
def __init__(self, description: str) -> None:
"""
初始化基础代理
Args:
description: 代理描述信息
Example:
```python
class MyAgent(BaseAgent):
def __init__(self):
super().__init__("我的自定义代理")
async def on_message_impl(self, message, ctx):
return f"处理消息: {message}"
```
"""
async def send_message(
self,
message: Any,
recipient: AgentId,
*,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> Any:
"""
发送消息到其他代理
Args:
message: 消息内容
recipient: 接收方代理ID
cancellation_token: 取消令牌
message_id: 消息ID
Returns:
Any: 接收方的响应
Example:
```python
class CoordinatorAgent(BaseAgent):
async def coordinate_task(self, task):
# 发送任务到工作代理
result = await self.send_message(
message=WorkTask(task_data=task),
recipient=AgentId("WorkerAgent", "default")
)
return result
```
"""
class RoutedAgent(BaseAgent):
"""路由代理 - 支持装饰器方式的消息处理"""
def __init__(self, description: str) -> None:
"""
初始化路由代理
Args:
description: 代理描述
Example:
```python
class ChatAgent(RoutedAgent):
def __init__(self):
super().__init__("智能聊天代理")
@message_handler
async def handle_text(self, message: str, ctx: MessageContext) -> str:
return f"回复: {message}"
@event
async def handle_notification(self, event: dict, ctx: MessageContext) -> None:
print(f"收到通知: {event}")
```
"""
# 装饰器API
def message_handler(
func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None,
*,
strict: bool = True,
match: None | Callable[[ReceivesT, MessageContext], bool] = None,
) -> MessageHandler[AgentT, ReceivesT, ProducesT]:
"""
消息处理器装饰器
Args:
func: 被装饰的处理函数
strict: 是否启用严格类型检查
match: 二次路由匹配函数
Example:
```python
class MyAgent(RoutedAgent):
@message_handler
async def handle_greeting(self, message: GreetingMessage, ctx: MessageContext) -> str:
return f"你好, {message.name}!"
@message_handler(match=lambda msg, ctx: msg.priority == "high")
async def handle_urgent(self, message: UrgentMessage, ctx: MessageContext) -> str:
return "正在紧急处理..."
```
"""
def event(
func: Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]]
) -> MessageHandler[AgentT, ReceivesT, None]:
"""
事件处理器装饰器 - 处理无需返回值的事件
Example:
```python
@event
async def handle_system_event(self, event: SystemEvent, ctx: MessageContext) -> None:
self.logger.info(f"系统事件: {event.type}")
```
"""
def rpc(
func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None,
*,
strict: bool = True,
match: None | Callable[[ReceivesT, MessageContext], bool] = None,
) -> MessageHandler[AgentT, ReceivesT, ProducesT]:
"""
RPC处理器装饰器 - 处理需要返回值的RPC调用
Example:
```python
@rpc
async def calculate(self, request: CalculationRequest, ctx: MessageContext) -> CalculationResponse:
result = request.a + request.b
return CalculationResponse(result=result)
```
"""
|