1. 性能优化最佳实践
1.1 预热机制 (Prewarming)
def prewarm(proc: JobProcess):
"""
预热函数 - 在进程启动时初始化重型组件
优势:
1. 减少首次会话的启动延迟
2. 避免重复加载模型
3. 提高资源利用效率
"""
# 预加载VAD模型(最常用的优化)
proc.userdata["vad"] = silero.VAD.load(
min_speech_duration=0.2,
min_silence_duration=0.6,
)
# 预加载其他重型组件
proc.userdata["tokenizer"] = load_tokenizer()
proc.userdata["embedding_model"] = load_embedding_model()
# 在WorkerOptions中启用预热
cli.run_app(WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm # 关键:启用预热
))
1.2 预先生成 (Preemptive Generation)
async def entrypoint(ctx: JobContext):
"""
预先生成配置 - 在用户说话时就开始LLM推理
性能提升:
- 减少响应延迟 30-50%
- 提高对话流畅度
- 更好的用户体验
"""
session = AgentSession(
vad=ctx.proc.userdata["vad"],
llm=openai.LLM(model="gpt-4o-mini"),
stt=deepgram.STT(model="nova-3", language="multi"),
tts=openai.TTS(voice="ash"),
# 性能优化关键配置
preemptive_generation=True, # 启用预先生成
resume_false_interruption=True, # 恢复错误中断
false_interruption_timeout=1.0, # 错误中断超时
min_interruption_duration=0.2, # 更敏感的中断检测
# 使用高级转换检测
turn_detection=MultilingualModel(),
)
1.3 指标收集和监控
async def entrypoint(ctx: JobContext):
"""
完整的指标收集和监控实现
监控内容:
1. 使用统计(token、请求数量)
2. 性能指标(延迟、错误率)
3. 成本分析
4. 用户行为分析
"""
# 设置日志上下文
ctx.log_context_fields = {
"room": ctx.room.name,
"session_id": generate_session_id(),
"user_id": extract_user_id(ctx),
}
# 创建指标收集器
usage_collector = metrics.UsageCollector()
performance_tracker = PerformanceTracker()
@session.on("metrics_collected")
def _on_metrics_collected(ev: MetricsCollectedEvent):
"""
实时指标处理
处理内容:
1. 记录详细指标
2. 检测异常情况
3. 触发告警
4. 更新仪表板
"""
# 记录指标
metrics.log_metrics(ev.metrics)
usage_collector.collect(ev.metrics)
# 性能分析
performance_tracker.track_latency(ev.metrics)
performance_tracker.track_errors(ev.metrics)
# 异常检测
if performance_tracker.detect_anomaly(ev.metrics):
logger.warning("检测到性能异常", extra=ev.metrics)
send_alert(ev.metrics)
async def log_usage():
"""会话结束时的统计汇总"""
summary = usage_collector.get_summary()
performance_summary = performance_tracker.get_summary()
logger.info(f"会话统计: {summary}")
logger.info(f"性能统计: {performance_summary}")
# 发送到监控系统
send_to_monitoring_system({
"usage": summary,
"performance": performance_summary,
"session_duration": performance_tracker.session_duration,
})
# 注册关闭回调
ctx.add_shutdown_callback(log_usage)
1.4 错误处理和恢复
class RobustAgent(Agent):
"""
健壮的代理实现 - 包含完整的错误处理机制
错误处理策略:
1. 分层错误处理
2. 自动重试机制
3. 优雅降级
4. 用户友好的错误消息
"""
def __init__(self):
super().__init__(
instructions="你是一个可靠的助手,即使遇到技术问题也能提供帮助。",
)
self.error_counts = defaultdict(int)
self.last_successful_response = None
@function_tool
async def resilient_api_call(
self,
context: RunContext,
query: str
) -> str:
"""
具有弹性的API调用示例
实现特性:
1. 指数退避重试
2. 熔断器模式
3. 降级响应
4. 详细错误日志
"""
max_retries = 3
base_delay = 1.0
for attempt in range(max_retries):
try:
# 尝试API调用
result = await external_api_call(query)
# 重置错误计数
self.error_counts["api_call"] = 0
self.last_successful_response = result
return result
except APITimeoutError as e:
self.error_counts["timeout"] += 1
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
logger.warning(f"API超时,{delay}秒后重试: {e}")
await asyncio.sleep(delay)
else:
return self._handle_timeout_fallback(query)
except APIRateLimitError as e:
self.error_counts["rate_limit"] += 1
if attempt < max_retries - 1:
# 速率限制时等待更长时间
delay = base_delay * (3 ** attempt)
logger.warning(f"API速率限制,{delay}秒后重试: {e}")
await asyncio.sleep(delay)
else:
return "抱歉,服务暂时繁忙,请稍后再试。"
except APIConnectionError as e:
self.error_counts["connection"] += 1
logger.error(f"API连接错误: {e}")
return self._handle_connection_fallback(query)
except Exception as e:
self.error_counts["unknown"] += 1
logger.error(f"未知错误: {e}", exc_info=True)
raise ToolError(f"服务暂时不可用,请稍后再试。")
def _handle_timeout_fallback(self, query: str) -> str:
"""超时降级处理"""
if self.last_successful_response:
return f"抱歉,服务响应较慢。基于之前的信息:{self.last_successful_response}"
return "抱歉,服务暂时响应较慢,请稍后再试。"
def _handle_connection_fallback(self, query: str) -> str:
"""连接错误降级处理"""
return "抱歉,无法连接到外部服务。我可以基于已有知识为您提供帮助。"
2. 架构设计最佳实践
2.1 多代理系统设计
@dataclass
class RestaurantUserData:
"""
餐厅系统用户数据结构
设计原则:
1. 数据结构清晰
2. 状态管理明确
3. 代理间数据共享
4. 审计跟踪
"""
# 基础信息
customer_name: str | None = None
customer_phone: str | None = None
# 业务数据
reservation_time: str | None = None
order: list[str] | None = None
# 支付信息
customer_credit_card: str | None = None
expense: float | None = None
checked_out: bool | None = None
# 代理管理
agents: dict[str, Agent] = field(default_factory=dict)
prev_agent: Agent | None = None
def summarize(self) -> str:
"""数据摘要 - 用于代理间上下文传递"""
data = {
"customer": {
"name": self.customer_name or "unknown",
"phone": self.customer_phone or "unknown",
},
"reservation": self.reservation_time or "none",
"order": self.order or [],
"payment": {
"expense": self.expense or 0,
"checked_out": self.checked_out or False,
}
}
return yaml.dump(data, default_flow_style=False)
class GreeterAgent(Agent):
"""
迎宾代理 - 多代理系统的入口点
职责:
1. 欢迎用户
2. 识别用户需求
3. 路由到专门代理
4. 收集基础信息
"""
def __init__(self, userdata: RestaurantUserData):
super().__init__(
instructions=f"""
你是餐厅的迎宾员。你的职责是:
1. 热情欢迎客户
2. 了解客户需求(预订、外卖、咨询)
3. 收集基础信息(姓名、电话)
4. 将客户转接给专门的服务代理
当前客户信息:
{userdata.summarize()}
""",
)
@function_tool
async def handoff_to_reservation(
self,
context: RunContext[RestaurantUserData],
reason: str = "客户需要预订服务"
) -> tuple[Agent, str]:
"""转接到预订代理"""
userdata = context.userdata
# 创建专门的预订代理
reservation_agent = ReservationAgent(userdata)
userdata.agents["reservation"] = reservation_agent
userdata.prev_agent = self
return reservation_agent, f"正在为您转接到预订服务。{reason}"
@function_tool
async def handoff_to_takeaway(
self,
context: RunContext[RestaurantUserData],
reason: str = "客户需要外卖服务"
) -> tuple[Agent, str]:
"""转接到外卖代理"""
userdata = context.userdata
takeaway_agent = TakeawayAgent(userdata)
userdata.agents["takeaway"] = takeaway_agent
userdata.prev_agent = self
return takeaway_agent, f"正在为您转接到外卖服务。{reason}"
class ReservationAgent(Agent):
"""
预订代理 - 处理餐桌预订业务
专门职责:
1. 收集预订信息
2. 检查可用性
3. 确认预订
4. 发送确认信息
"""
def __init__(self, userdata: RestaurantUserData):
super().__init__(
instructions=f"""
你是餐厅的预订专员。基于以下客户信息提供专业的预订服务:
{userdata.summarize()}
你的职责:
1. 确认客户预订需求(日期、时间、人数)
2. 检查餐桌可用性
3. 收集必要信息(姓名、电话)
4. 确认预订并提供预订号
""",
)
@function_tool
async def check_availability(
self,
context: RunContext[RestaurantUserData],
date: str,
time: str,
party_size: int
) -> str:
"""检查餐桌可用性"""
# 模拟数据库查询
available = await check_table_availability(date, time, party_size)
if available:
return f"{date} {time} 有适合 {party_size} 人的餐桌可预订。"
else:
# 提供替代时间
alternatives = await get_alternative_times(date, party_size)
return f"{date} {time} 已满,推荐时间:{', '.join(alternatives)}"
@function_tool
async def confirm_reservation(
self,
context: RunContext[RestaurantUserData],
date: str,
time: str,
party_size: int
) -> str:
"""确认预订"""
userdata = context.userdata
# 生成预订号
reservation_id = generate_reservation_id()
# 保存预订信息
userdata.reservation_time = f"{date} {time}"
# 发送确认
confirmation = await send_reservation_confirmation(
name=userdata.customer_name,
phone=userdata.customer_phone,
reservation_id=reservation_id,
date=date,
time=time,
party_size=party_size
)
return f"预订已确认!预订号:{reservation_id}。确认信息已发送到您的手机。"
2.2 复杂业务流程设计
class DriveThruAgent(Agent):
"""
汽车餐厅代理 - 复杂订单处理系统
设计特点:
1. 状态机驱动
2. 动态工具生成
3. 数据验证
4. 业务规则引擎
"""
def __init__(self, userdata: DriveThruUserdata):
# 动态生成指令 - 包含菜单信息
instructions = self._build_dynamic_instructions(userdata)
# 动态生成工具 - 基于可用菜单项
tools = self._build_dynamic_tools(userdata)
super().__init__(
instructions=instructions,
tools=tools,
)
self.userdata = userdata
def _build_dynamic_instructions(self, userdata: DriveThruUserdata) -> str:
"""
动态构建指令 - 基于当前菜单和促销信息
优势:
1. 指令始终与业务数据同步
2. 支持动态促销和菜单更新
3. 个性化服务体验
"""
base_instructions = """
你是汽车餐厅的服务员。你需要:
1. 热情欢迎客户
2. 介绍今日特色和促销
3. 帮助客户完成订单
4. 确认订单详情和价格
5. 处理支付和取餐安排
"""
# 添加菜单信息
menu_info = ""
for category, items in userdata.menu_items.items():
menu_info += f"\n\n{category.upper()}菜单:\n"
for item in items:
menu_info += f"- {item.name} (ID: {item.id}): ${item.price}\n"
# 添加促销信息
promotions = get_current_promotions()
if promotions:
promo_info = "\n\n今日促销:\n"
for promo in promotions:
promo_info += f"- {promo.description}\n"
else:
promo_info = ""
return base_instructions + menu_info + promo_info
def _build_dynamic_tools(self, userdata: DriveThruUserdata) -> list[FunctionTool]:
"""
动态构建工具函数 - 基于可用菜单项
设计模式:
1. 工厂模式创建工具
2. 运行时类型检查
3. 业务规则验证
4. 错误处理包装
"""
tools = []
# 为每种菜单类型创建订单工具
if userdata.combo_items:
tools.append(self._build_combo_order_tool(userdata))
if userdata.regular_items:
tools.append(self._build_regular_order_tool(userdata))
if userdata.happy_items:
tools.append(self._build_happy_meal_tool(userdata))
# 添加通用工具
tools.extend([
self._build_modify_order_tool(),
self._build_checkout_tool(),
self._build_cancel_order_tool(),
])
return tools
def _build_combo_order_tool(self, userdata: DriveThruUserdata) -> FunctionTool:
"""构建套餐订单工具"""
available_combo_ids = {item.id for item in userdata.combo_items}
available_drink_ids = {item.id for item in userdata.drink_items}
available_sauce_ids = {item.id for item in userdata.sauce_items}
@function_tool
async def order_combo_meal(
ctx: RunContext[DriveThruUserdata],
meal_id: Annotated[str, Field(
description="套餐ID",
json_schema_extra={"enum": list(available_combo_ids)}
)],
drink_id: Annotated[str, Field(
description="饮料ID",
json_schema_extra={"enum": list(available_drink_ids)}
)],
sauce_ids: Annotated[list[str], Field(
description="酱料ID列表",
json_schema_extra={"items": {"enum": list(available_sauce_ids)}}
)] = [],
special_requests: str = "",
) -> str:
"""
套餐订单处理函数
业务逻辑:
1. 验证菜单项可用性
2. 检查库存状态
3. 计算价格(含促销)
4. 更新订单状态
5. 返回确认信息
"""
try:
# 验证输入
meal_item = find_item_by_id(userdata.combo_items, meal_id)
drink_item = find_item_by_id(userdata.drink_items, drink_id)
sauce_items = find_items_by_id(userdata.sauce_items, sauce_ids)
if not meal_item:
raise ToolError(f"套餐 {meal_id} 不存在")
if not drink_item:
raise ToolError(f"饮料 {drink_id} 不存在")
# 检查库存
if not await check_inventory(meal_id, drink_id, sauce_ids):
raise ToolError("抱歉,部分商品库存不足")
# 创建订单项
combo_order = OrderedCombo(
meal=meal_item,
drink=drink_item,
sauces=sauce_items,
special_requests=special_requests,
)
# 计算价格(应用促销)
base_price = combo_order.calculate_price()
discount = apply_promotions(combo_order)
final_price = base_price - discount
# 更新订单
ctx.userdata.order.add_item(combo_order)
ctx.userdata.order.total_price += final_price
# 返回确认
confirmation = f"""
已添加套餐:
- {meal_item.name}
- {drink_item.name}
- 酱料:{', '.join(s.name for s in sauce_items)}
"""
if special_requests:
confirmation += f"\n- 特殊要求:{special_requests}"
if discount > 0:
confirmation += f"\n- 原价:${base_price:.2f}"
confirmation += f"\n- 优惠:-${discount:.2f}"
confirmation += f"\n- 小计:${final_price:.2f}"
confirmation += f"\n\n当前订单总额:${ctx.userdata.order.total_price:.2f}"
return confirmation
except Exception as e:
logger.error(f"套餐订单处理失败: {e}", exc_info=True)
raise ToolError(f"订单处理失败:{str(e)}")
return order_combo_meal
3. 用户体验优化
3.1 自然对话设计
class NaturalConversationAgent(Agent):
"""
自然对话代理 - 优化用户体验的设计模式
设计原则:
1. 对话式交互
2. 上下文感知
3. 个性化响应
4. 错误容忍
"""
def __init__(self):
super().__init__(
instructions="""
你是一个自然、友好的AI助手。对话风格要求:
1. 自然对话:
- 使用日常对话语言,避免机械化表达
- 适当使用语气词和连接词
- 根据上下文调整语调
2. 上下文感知:
- 记住之前的对话内容
- 理解隐含的意图和情感
- 避免重复询问已知信息
3. 错误处理:
- 优雅处理模糊或不完整的输入
- 主动澄清歧义
- 提供有用的建议和选项
4. 个性化:
- 根据用户偏好调整回应
- 记住用户的习惯和选择
- 提供相关的个性化建议
""",
)
async def on_user_turn_completed(
self,
turn_ctx: ChatContext,
new_message: ChatMessage
) -> None:
"""
用户回合完成处理 - 实现上下文感知和智能响应
处理逻辑:
1. 分析用户意图和情感
2. 检查对话历史和模式
3. 准备个性化响应策略
4. 处理特殊情况(如沉默、重复等)
"""
user_input = new_message.text_content or ""
# 情感分析
sentiment = await analyze_sentiment(user_input)
# 意图识别
intent = await classify_intent(user_input, turn_ctx.messages)
# 检查对话模式
conversation_pattern = analyze_conversation_pattern(turn_ctx.messages)
# 个性化策略
if sentiment == "frustrated":
# 用户沮丧时的处理
self.session.generate_reply(
instructions="用户似乎有些沮丧,请用同理心回应并主动提供帮助。"
)
elif conversation_pattern == "repetitive":
# 重复对话的处理
self.session.generate_reply(
instructions="用户可能在重复同样的问题,尝试用不同的方式解释或提供替代方案。"
)
elif intent == "unclear":
# 意图不明确时的处理
self.session.generate_reply(
instructions="用户的意图不够清晰,请友善地要求澄清,并提供一些可能的选项。"
)
else:
# 正常对话流程
self.session.generate_reply()
@function_tool
async def smart_search(
self,
context: RunContext,
query: str,
search_type: Literal["web", "knowledge_base", "faq"] = "knowledge_base"
) -> str:
"""
智能搜索工具 - 提供上下文感知的搜索结果
特性:
1. 多源搜索整合
2. 结果排序和过滤
3. 个性化推荐
4. 相关性评分
"""
# 扩展查询上下文
expanded_query = await expand_query_with_context(
query,
context.session.history
)
# 多源搜索
results = await parallel_search(
query=expanded_query,
sources=[search_type, "related_topics"],
user_profile=context.userdata.get("profile", {})
)
# 结果整合和排序
ranked_results = rank_results_by_relevance(
results,
query,
context.session.history
)
# 格式化响应
if ranked_results:
response = format_search_results(ranked_results[:3]) # 前3个结果
# 添加相关建议
related_queries = generate_related_queries(query, ranked_results)
if related_queries:
response += f"\n\n您可能还想了解:{', '.join(related_queries)}"
return response
else:
return "抱歉,没有找到相关信息。您能提供更多详细信息吗?"
3.2 背景音频和氛围营造
async def entrypoint(ctx: JobContext):
"""
完整的背景音频配置 - 提升用户体验
背景音频功能:
1. 营造氛围
2. 掩盖技术噪音
3. 提供听觉反馈
4. 增强沉浸感
"""
session = AgentSession(
vad=ctx.proc.userdata["vad"],
llm=openai.LLM(model="gpt-4o-mini"),
stt=deepgram.STT(model="nova-3"),
tts=openai.TTS(voice="ash"),
)
# 配置背景音频
background_audio = BackgroundAudioPlayer(
# 思考时的背景音
thinking_audio=AudioConfig(
file_path="assets/thinking-ambience.ogg",
volume=0.3,
loop=True,
fade_in_duration=0.5,
fade_out_duration=0.5,
),
# 打字音效
typing_audio=AudioConfig(
file_path="assets/keyboard-typing.ogg",
volume=0.2,
loop=True,
),
# 办公室环境音
ambient_audio=AudioConfig(
file_path="assets/office-ambience.ogg",
volume=0.1,
loop=True,
continuous=True, # 持续播放
),
)
# 启动会话和背景音频
await session.start(agent=MyAgent(), room=ctx.room)
await background_audio.start(room=ctx.room, agent_session=session)
# 背景音频会根据代理状态自动切换:
# - listening: 播放环境音
# - thinking: 播放思考音 + 环境音
# - speaking: 停止所有背景音
4. 安全和隐私最佳实践
4.1 敏感数据处理
class SecureAgent(Agent):
"""
安全代理 - 处理敏感信息的最佳实践
安全措施:
1. 数据脱敏
2. 访问控制
3. 审计日志
4. 合规检查
"""
def __init__(self):
super().__init__(
instructions="""
你是一个安全的AI助手。处理敏感信息时:
1. 不要在响应中显示完整的敏感信息
2. 使用掩码显示(如:****-1234)
3. 提醒用户注意隐私保护
4. 遵守数据保护法规
""",
)
@function_tool
async def process_payment_info(
self,
context: RunContext,
card_number: Annotated[str, Field(description="信用卡号码")],
expiry_date: Annotated[str, Field(description="有效期")],
cvv: Annotated[str, Field(description="安全码")],
) -> str:
"""
安全的支付信息处理
安全措施:
1. 立即加密敏感数据
2. 不在日志中记录原始数据
3. 使用安全的存储方式
4. 实现审计跟踪
"""
# 输入验证
if not validate_card_number(card_number):
raise ToolError("信用卡号码格式无效")
if not validate_expiry_date(expiry_date):
raise ToolError("有效期格式无效")
if not validate_cvv(cvv):
raise ToolError("安全码格式无效")
# 数据脱敏用于日志
masked_card = mask_card_number(card_number)
# 记录审计日志(不包含敏感信息)
audit_log = {
"action": "payment_info_processed",
"user_id": context.userdata.get("user_id"),
"masked_card": masked_card,
"timestamp": datetime.utcnow().isoformat(),
"session_id": context.session.session_id,
}
logger.info("支付信息处理", extra=audit_log)
try:
# 加密存储敏感信息
encrypted_data = encrypt_payment_info({
"card_number": card_number,
"expiry_date": expiry_date,
"cvv": cvv,
})
# 安全存储
payment_token = await store_encrypted_payment_info(
encrypted_data,
context.userdata.get("user_id")
)
# 处理支付
result = await process_payment_securely(payment_token)
return f"支付信息已安全处理。卡号:{masked_card},处理结果:{result}"
except Exception as e:
# 安全错误处理
logger.error("支付处理失败", extra={
"error": str(e),
"masked_card": masked_card,
"user_id": context.userdata.get("user_id"),
})
raise ToolError("支付处理失败,请检查信息后重试。")
finally:
# 清理内存中的敏感数据
clear_sensitive_variables(card_number, cvv)
4.2 访问控制和权限管理
class RoleBasedAgent(Agent):
"""
基于角色的访问控制代理
权限模型:
1. 角色定义
2. 权限检查
3. 操作审计
4. 会话隔离
"""
def __init__(self, user_role: str):
self.user_role = user_role
self.permissions = get_role_permissions(user_role)
super().__init__(
instructions=f"""
你是一个具有角色权限控制的助手。
当前用户角色:{user_role}
可用权限:{', '.join(self.permissions)}
严格按照权限执行操作,拒绝未授权的请求。
""",
)
def require_permission(self, permission: str):
"""权限检查装饰器"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
if permission not in self.permissions:
logger.warning(f"权限拒绝: 用户角色 {self.user_role} 尝试执行 {permission}")
raise ToolError(f"权限不足,需要 {permission} 权限")
return await func(*args, **kwargs)
return wrapper
return decorator
@function_tool
@require_permission("read_user_data")
async def get_user_info(
self,
context: RunContext,
user_id: str
) -> str:
"""获取用户信息 - 需要读取权限"""
user_info = await fetch_user_info(user_id)
# 根据权限过滤敏感信息
filtered_info = filter_user_info_by_permission(
user_info,
self.permissions
)
return json.dumps(filtered_info, ensure_ascii=False)
@function_tool
@require_permission("admin_operations")
async def delete_user_data(
self,
context: RunContext,
user_id: str
) -> str:
"""删除用户数据 - 需要管理员权限"""
# 额外的安全检查
if not await verify_admin_operation(
context.userdata.get("user_id"),
"delete_user_data"
):
raise ToolError("管理员操作需要额外验证")
# 执行删除操作
await delete_user_data_securely(user_id)
# 记录审计日志
audit_log = {
"action": "user_data_deleted",
"target_user": user_id,
"operator": context.userdata.get("user_id"),
"timestamp": datetime.utcnow().isoformat(),
}
logger.info("管理员操作", extra=audit_log)
return f"用户 {user_id} 的数据已安全删除"
5. 测试和质量保证
5.1 自动化测试框架
import pytest
from livekit.agents import AgentSession
from livekit.agents.voice.run_result import RunResult
class TestVoiceAgent:
"""
语音代理自动化测试套件
测试范围:
1. 功能测试
2. 性能测试
3. 错误处理测试
4. 集成测试
"""
@pytest.mark.asyncio
async def test_basic_conversation(self):
"""基础对话测试"""
async with AgentSession(llm=openai.LLM()) as session:
await session.start(MyAgent())
# 测试用户输入处理
result = await session.run(
user_input="Hello, how are you today?"
)
# 验证响应
result.expect.next_event().is_message(role="assistant")
await result.expect.next_event().judge(
llm=openai.LLM(),
intent="assistant should greet the user and ask about their needs"
)
@pytest.mark.asyncio
async def test_tool_execution(self):
"""工具执行测试"""
async with AgentSession(llm=openai.LLM()) as session:
await session.start(WeatherAgent())
result = await session.run(
user_input="What's the weather like in New York?"
)
# 验证工具调用序列
result.expect.skip_next_event_if(type="message", role="assistant")
result.expect.next_event().is_function_call(name="get_weather")
result.expect.next_event().is_function_call_output()
# 验证最终响应
await (
result.expect.next_event()
.is_message(role="assistant")
.judge(
llm=openai.LLM(),
intent="assistant should provide weather information for New York"
)
)
@pytest.mark.asyncio
async def test_error_handling(self):
"""错误处理测试"""
async with AgentSession(llm=openai.LLM()) as session:
await session.start(RobustAgent())
# 测试API错误处理
with mock.patch('external_api_call', side_effect=APITimeoutError()):
result = await session.run(
user_input="Call the external API"
)
# 验证错误恢复
result.expect.next_event().is_message(role="assistant")
assert "抱歉" in result.events[-1].data.content
@pytest.mark.asyncio
async def test_performance_metrics(self):
"""性能指标测试"""
start_time = time.time()
async with AgentSession(
llm=openai.LLM(),
preemptive_generation=True
) as session:
await session.start(MyAgent())
result = await session.run(
user_input="Tell me a short joke"
)
response_time = time.time() - start_time
# 性能断言
assert response_time < 3.0, f"响应时间过长: {response_time}s"
# 验证预先生成效果
metrics = session.get_metrics()
assert metrics.get("preemptive_generation_used", False)
@pytest.mark.asyncio
async def test_multi_agent_handoff(self):
"""多代理切换测试"""
userdata = RestaurantUserData()
async with AgentSession(llm=openai.LLM()) as session:
await session.start(GreeterAgent(userdata))
# 测试代理切换
result = await session.run(
user_input="I'd like to make a reservation"
)
# 验证切换到预订代理
result.expect.next_event().is_agent_handoff(
to_agent_type="ReservationAgent"
)
5.2 性能基准测试
class PerformanceBenchmark:
"""
性能基准测试套件
测试指标:
1. 响应延迟
2. 吞吐量
3. 资源使用率
4. 并发能力
"""
async def benchmark_response_latency(self):
"""响应延迟基准测试"""
latencies = []
for i in range(100):
start_time = time.time()
async with AgentSession(
llm=openai.LLM(model="gpt-4o-mini"),
preemptive_generation=True
) as session:
await session.start(MyAgent())
await session.run(user_input="Hello")
latency = time.time() - start_time
latencies.append(latency)
# 统计分析
avg_latency = sum(latencies) / len(latencies)
p95_latency = sorted(latencies)[95]
p99_latency = sorted(latencies)[99]
print(f"平均延迟: {avg_latency:.2f}s")
print(f"P95延迟: {p95_latency:.2f}s")
print(f"P99延迟: {p99_latency:.2f}s")
# 性能断言
assert avg_latency < 2.0, f"平均延迟过高: {avg_latency}s"
assert p95_latency < 3.0, f"P95延迟过高: {p95_latency}s"
async def benchmark_concurrent_sessions(self):
"""并发会话基准测试"""
concurrent_sessions = 10
tasks = []
for i in range(concurrent_sessions):
task = asyncio.create_task(self._run_session(f"user_{i}"))
tasks.append(task)
start_time = time.time()
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
# 分析结果
successful_sessions = sum(1 for r in results if r.success)
throughput = successful_sessions / total_time
print(f"并发会话数: {concurrent_sessions}")
print(f"成功会话数: {successful_sessions}")
print(f"吞吐量: {throughput:.2f} sessions/s")
assert successful_sessions >= concurrent_sessions * 0.95 # 95%成功率
这个最佳实践文档涵盖了LiveKit Agents框架的核心优化策略、架构设计模式、用户体验提升、安全实践和测试方法。每个部分都包含了详细的代码示例和实际应用场景,帮助开发者构建高质量的语音AI应用。