1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
| class StateMachineAgent(RoutedAgent):
"""状态机代理 - 基于状态机的复杂业务流程管理"""
def __init__(self, name: str, state_machine_config: StateMachineConfig):
super().__init__(f"状态机代理: {name}")
self.state_machine = StateMachine(state_machine_config)
self.current_state = state_machine_config.initial_state
self.state_history = []
self.transition_handlers = {}
@rpc
async def trigger_transition(
self,
trigger: StateTrigger,
ctx: MessageContext
) -> StateTransitionResult:
"""触发状态转换"""
transition_id = str(uuid.uuid4())
start_time = datetime.utcnow()
try:
# 1. 验证转换合法性
valid_transitions = self.state_machine.get_valid_transitions(self.current_state)
target_state = None
for transition in valid_transitions:
if transition.trigger == trigger.trigger_type:
target_state = transition.target_state
break
if not target_state:
return StateTransitionResult(
success=False,
error=f"从状态 {self.current_state} 无法通过触发器 {trigger.trigger_type} 进行转换"
)
# 2. 执行转换前处理
pre_transition_result = await self._execute_pre_transition(
self.current_state, target_state, trigger
)
if not pre_transition_result.success:
return StateTransitionResult(
success=False,
error=f"转换前处理失败: {pre_transition_result.error}"
)
# 3. 执行状态转换
old_state = self.current_state
self.current_state = target_state
# 4. 执行转换后处理
post_transition_result = await self._execute_post_transition(
old_state, target_state, trigger
)
# 5. 记录状态历史
state_change = StateChange(
transition_id=transition_id,
from_state=old_state,
to_state=target_state,
trigger=trigger,
timestamp=start_time,
context=trigger.context
)
self.state_history.append(state_change)
# 6. 发布状态变更事件
await self.publish_message(
StateChangedEvent(
agent_id=self.id,
state_change=state_change
),
TopicId("agent.state.changed", str(self.id))
)
return StateTransitionResult(
success=True,
transition_id=transition_id,
from_state=old_state,
to_state=target_state,
duration=(datetime.utcnow() - start_time).total_seconds()
)
except Exception as e:
return StateTransitionResult(
success=False,
error=f"状态转换异常: {str(e)}"
)
async def _execute_pre_transition(
self,
from_state: str,
to_state: str,
trigger: StateTrigger
) -> TransitionResult:
"""执行转换前处理"""
handler_key = f"{from_state}_to_{to_state}_pre"
if handler_key in self.transition_handlers:
handler = self.transition_handlers[handler_key]
return await handler(trigger)
return TransitionResult(success=True)
async def _execute_post_transition(
self,
from_state: str,
to_state: str,
trigger: StateTrigger
) -> TransitionResult:
"""执行转换后处理"""
handler_key = f"{from_state}_to_{to_state}_post"
if handler_key in self.transition_handlers:
handler = self.transition_handlers[handler_key]
return await handler(trigger)
return TransitionResult(success=True)
# 状态机配置示例
order_state_machine_config = StateMachineConfig(
name="订单状态机",
initial_state="created",
states=[
"created", "paid", "processing", "shipped", "delivered", "cancelled"
],
transitions=[
StateTransition("created", "paid", "payment_completed"),
StateTransition("paid", "processing", "start_processing"),
StateTransition("processing", "shipped", "shipping_confirmed"),
StateTransition("shipped", "delivered", "delivery_confirmed"),
StateTransition("created", "cancelled", "order_cancelled"),
StateTransition("paid", "cancelled", "order_cancelled"),
]
)
class OrderManagementAgent(StateMachineAgent):
"""订单管理代理 - 使用状态机管理订单生命周期"""
def __init__(self):
super().__init__("订单管理", order_state_machine_config)
# 注册转换处理器
self.transition_handlers.update({
"created_to_paid_post": self._on_payment_completed,
"processing_to_shipped_post": self._on_shipping_confirmed,
"shipped_to_delivered_post": self._on_delivery_confirmed
})
async def _on_payment_completed(self, trigger: StateTrigger) -> TransitionResult:
"""支付完成后处理"""
# 通知库存系统开始处理
await self.send_message(
StartProcessingCommand(order_id=trigger.context['order_id']),
AgentId("InventoryAgent", "default")
)
return TransitionResult(success=True)
async def _on_shipping_confirmed(self, trigger: StateTrigger) -> TransitionResult:
"""发货确认后处理"""
# 发送发货通知
await self.send_message(
ShippingNotification(
order_id=trigger.context['order_id'],
tracking_number=trigger.context['tracking_number']
),
AgentId("NotificationAgent", "default")
)
return TransitionResult(success=True)
|