1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
| class IntelligentMessageRouter:
"""智能消息路由器 - 基于内容和上下文的智能路由"""
def __init__(self):
self.routing_rules = RoutingRuleEngine()
self.route_cache = TTLCache(maxsize=50000, ttl=600) # 10分钟缓存
self.routing_history = deque(maxlen=1000) # 保留最近1000次路由记录
self.ml_router = MachineLearningRouter()
async def route_message(
self,
message: Any,
sender: Optional[AgentId],
routing_hint: Optional[RoutingHint] = None
) -> RoutingDecision:
"""
智能路由消息到最合适的代理
Args:
message: 要路由的消息
sender: 发送方代理ID
routing_hint: 路由提示信息
Returns:
RoutingDecision: 路由决策结果
"""
routing_context = RoutingContext(
message=message,
sender=sender,
timestamp=datetime.utcnow(),
hint=routing_hint
)
# 1. 快速规则路由
rule_decision = await self._apply_routing_rules(routing_context)
if rule_decision.confidence > 0.9:
await self._record_routing_decision(routing_context, rule_decision, 'rule_based')
return rule_decision
# 2. 基于历史的相似性路由
similarity_decision = await self._similarity_based_routing(routing_context)
if similarity_decision.confidence > 0.8:
await self._record_routing_decision(routing_context, similarity_decision, 'similarity_based')
return similarity_decision
# 3. 机器学习路由
ml_decision = await self.ml_router.predict_best_agent(routing_context)
if ml_decision.confidence > 0.7:
await self._record_routing_decision(routing_context, ml_decision, 'ml_based')
return ml_decision
# 4. 默认路由 (负载均衡)
default_decision = await self._default_load_balanced_routing(routing_context)
await self._record_routing_decision(routing_context, default_decision, 'default')
return default_decision
async def _apply_routing_rules(self, context: RoutingContext) -> RoutingDecision:
"""应用路由规则引擎"""
message_content = self._extract_message_content(context.message)
# 内容分析路由规则
content_rules = [
{
'pattern': r'天气|气温|温度',
'target_agent_type': 'WeatherAgent',
'confidence': 0.95
},
{
'pattern': r'计算|数学|运算',
'target_agent_type': 'CalculatorAgent',
'confidence': 0.95
},
{
'pattern': r'代码|编程|程序',
'target_agent_type': 'CoderAgent',
'confidence': 0.9
},
{
'pattern': r'翻译|translate',
'target_agent_type': 'TranslatorAgent',
'confidence': 0.9
}
]
# 应用规则匹配
for rule in content_rules:
if re.search(rule['pattern'], message_content, re.IGNORECASE):
return RoutingDecision(
target_agent_type=rule['target_agent_type'],
confidence=rule['confidence'],
reasoning=f"匹配规则: {rule['pattern']}",
routing_strategy='rule_based'
)
# 发送方路由规则
if context.sender:
sender_rules = await self._get_sender_based_rules(context.sender.type)
for rule in sender_rules:
if rule.matches(context):
return RoutingDecision(
target_agent_type=rule.target_type,
confidence=rule.confidence,
reasoning=f"发送方规则: {rule.description}",
routing_strategy='sender_based'
)
# 无匹配规则
return RoutingDecision(
target_agent_type=None,
confidence=0.0,
reasoning="无匹配的路由规则"
)
async def _similarity_based_routing(self, context: RoutingContext) -> RoutingDecision:
"""基于历史相似性的路由"""
# 1. 提取消息特征
message_features = await self._extract_message_features(context.message)
# 2. 在历史记录中查找相似消息
similar_records = []
for historical_record in self.routing_history:
similarity = await self._calculate_similarity(message_features, historical_record.features)
if similarity > 0.7: # 相似度阈值
similar_records.append((historical_record, similarity))
if not similar_records:
return RoutingDecision(confidence=0.0, reasoning="无相似历史记录")
# 3. 按相似度排序
similar_records.sort(key=lambda x: x[1], reverse=True)
# 4. 分析最相似记录的路由结果
most_similar_record, similarity = similar_records[0]
if most_similar_record.routing_success_rate > 0.8:
return RoutingDecision(
target_agent_type=most_similar_record.target_agent_type,
confidence=similarity * most_similar_record.routing_success_rate,
reasoning=f"基于相似度 {similarity:.2f} 的历史记录",
routing_strategy='similarity_based'
)
return RoutingDecision(confidence=0.0, reasoning="相似记录成功率过低")
async def update_routing_feedback(
self,
routing_id: str,
success: bool,
response_time: float,
user_satisfaction: Optional[float] = None
) -> None:
"""更新路由反馈信息,用于优化路由算法"""
# 1. 查找路由记录
routing_record = await self._find_routing_record(routing_id)
if not routing_record:
return
# 2. 更新反馈信息
routing_record.success = success
routing_record.response_time = response_time
routing_record.user_satisfaction = user_satisfaction
routing_record.feedback_updated_at = datetime.utcnow()
# 3. 更新路由成功率统计
await self._update_routing_success_rate(
routing_record.target_agent_type,
routing_record.routing_strategy,
success
)
# 4. 训练机器学习模型
if len(self.routing_history) % 100 == 0: # 每100次反馈重训练一次
await self.ml_router.retrain_model(list(self.routing_history))
|