1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
| class CustomReplyStrategy:
"""自定义回复策略 - 基于消息内容智能路由"""
def __init__(self, priority_keywords: List[str], escalation_threshold: int = 3):
self.priority_keywords = priority_keywords
self.escalation_threshold = escalation_threshold
self.message_count = 0
async def __call__(
self,
recipient: Agent,
messages: List[BaseChatMessage],
sender: Agent,
config: Dict[str, Any]
) -> Tuple[bool, Optional[str]]:
"""
自定义回复策略实现
Args:
recipient: 接收消息的代理
messages: 消息历史列表
sender: 发送消息的代理
config: 配置参数
Returns:
Tuple[bool, Optional[str]]: (是否处理, 回复内容)
"""
if not messages:
return False, None
last_message = messages[-1]
content = last_message.content.lower()
# 1. 优先级关键词检测
for keyword in self.priority_keywords:
if keyword in content:
priority_response = await self._handle_priority_message(last_message, keyword)
return True, priority_response
# 2. 情感分析和适应性响应
sentiment = await self._analyze_sentiment(content)
if sentiment == "negative" and self.message_count > 2:
escalation_response = await self._escalate_to_human(last_message)
return True, escalation_response
# 3. 上下文感知响应
context = await self._extract_context(messages)
if context.get("requires_expert"):
expert_response = await self._route_to_expert(last_message, context)
return True, expert_response
self.message_count += 1
return False, None # 继续默认处理流程
async def _handle_priority_message(self, message: BaseChatMessage, keyword: str) -> str:
"""处理优先级消息"""
return f"检测到优先级关键词 '{keyword}',正在优先处理您的请求..."
async def _analyze_sentiment(self, content: str) -> str:
"""情感分析"""
# 简化的情感分析实现
negative_indicators = ["生气", "愤怒", "不满", "糟糕", "失望"]
if any(indicator in content for indicator in negative_indicators):
return "negative"
return "neutral"
async def _escalate_to_human(self, message: BaseChatMessage) -> str:
"""升级到人工处理"""
return "我注意到您可能遇到了一些困难,正在为您转接人工客服..."
async def _extract_context(self, messages: List[BaseChatMessage]) -> Dict[str, Any]:
"""提取对话上下文"""
context = {
"topic": None,
"requires_expert": False,
"user_intent": None
}
# 分析最近几条消息确定主题
recent_content = " ".join([msg.content for msg in messages[-3:]])
if any(tech_word in recent_content for tech_word in ["技术", "代码", "编程", "bug"]):
context["requires_expert"] = True
context["topic"] = "technical"
return context
# 使用示例
async def setup_custom_reply_strategy():
"""设置自定义回复策略示例"""
# 创建智能体
assistant = RoutedAgent("智能助手")
# 创建自定义策略
custom_strategy = CustomReplyStrategy(
priority_keywords=["紧急", "重要", "立即", "urgent"],
escalation_threshold=3
)
# 注册回复策略
assistant.register_reply_handler(
trigger_condition=lambda msg: True, # 触发条件
reply_function=custom_strategy,
priority=0 # 最高优先级
)
return assistant
|