1. 性能优化最佳实践

1.1 预热机制 (Prewarming)

def prewarm(proc: JobProcess):
    """
    预热函数 - 在进程启动时初始化重型组件
    
    优势:
    1. 减少首次会话的启动延迟
    2. 避免重复加载模型
    3. 提高资源利用效率
    """
    # 预加载VAD模型(最常用的优化)
    proc.userdata["vad"] = silero.VAD.load(
        min_speech_duration=0.2,
        min_silence_duration=0.6,
    )
    
    # 预加载其他重型组件
    proc.userdata["tokenizer"] = load_tokenizer()
    proc.userdata["embedding_model"] = load_embedding_model()

# 在WorkerOptions中启用预热
cli.run_app(WorkerOptions(
    entrypoint_fnc=entrypoint,
    prewarm_fnc=prewarm  # 关键:启用预热
))

1.2 预先生成 (Preemptive Generation)

async def entrypoint(ctx: JobContext):
    """
    预先生成配置 - 在用户说话时就开始LLM推理
    
    性能提升:
    - 减少响应延迟 30-50%
    - 提高对话流畅度
    - 更好的用户体验
    """
    session = AgentSession(
        vad=ctx.proc.userdata["vad"],
        llm=openai.LLM(model="gpt-4o-mini"),
        stt=deepgram.STT(model="nova-3", language="multi"),
        tts=openai.TTS(voice="ash"),
        
        # 性能优化关键配置
        preemptive_generation=True,              # 启用预先生成
        resume_false_interruption=True,          # 恢复错误中断
        false_interruption_timeout=1.0,          # 错误中断超时
        min_interruption_duration=0.2,          # 更敏感的中断检测
        
        # 使用高级转换检测
        turn_detection=MultilingualModel(),
    )

1.3 指标收集和监控

async def entrypoint(ctx: JobContext):
    """
    完整的指标收集和监控实现
    
    监控内容:
    1. 使用统计(token、请求数量)
    2. 性能指标(延迟、错误率)
    3. 成本分析
    4. 用户行为分析
    """
    # 设置日志上下文
    ctx.log_context_fields = {
        "room": ctx.room.name,
        "session_id": generate_session_id(),
        "user_id": extract_user_id(ctx),
    }
    
    # 创建指标收集器
    usage_collector = metrics.UsageCollector()
    performance_tracker = PerformanceTracker()
    
    @session.on("metrics_collected")
    def _on_metrics_collected(ev: MetricsCollectedEvent):
        """
        实时指标处理
        
        处理内容:
        1. 记录详细指标
        2. 检测异常情况
        3. 触发告警
        4. 更新仪表板
        """
        # 记录指标
        metrics.log_metrics(ev.metrics)
        usage_collector.collect(ev.metrics)
        
        # 性能分析
        performance_tracker.track_latency(ev.metrics)
        performance_tracker.track_errors(ev.metrics)
        
        # 异常检测
        if performance_tracker.detect_anomaly(ev.metrics):
            logger.warning("检测到性能异常", extra=ev.metrics)
            send_alert(ev.metrics)
    
    async def log_usage():
        """会话结束时的统计汇总"""
        summary = usage_collector.get_summary()
        performance_summary = performance_tracker.get_summary()
        
        logger.info(f"会话统计: {summary}")
        logger.info(f"性能统计: {performance_summary}")
        
        # 发送到监控系统
        send_to_monitoring_system({
            "usage": summary,
            "performance": performance_summary,
            "session_duration": performance_tracker.session_duration,
        })
    
    # 注册关闭回调
    ctx.add_shutdown_callback(log_usage)

1.4 错误处理和恢复

class RobustAgent(Agent):
    """
    健壮的代理实现 - 包含完整的错误处理机制
    
    错误处理策略:
    1. 分层错误处理
    2. 自动重试机制
    3. 优雅降级
    4. 用户友好的错误消息
    """
    
    def __init__(self):
        super().__init__(
            instructions="你是一个可靠的助手,即使遇到技术问题也能提供帮助。",
        )
        self.error_counts = defaultdict(int)
        self.last_successful_response = None
    
    @function_tool
    async def resilient_api_call(
        self, 
        context: RunContext, 
        query: str
    ) -> str:
        """
        具有弹性的API调用示例
        
        实现特性:
        1. 指数退避重试
        2. 熔断器模式
        3. 降级响应
        4. 详细错误日志
        """
        max_retries = 3
        base_delay = 1.0
        
        for attempt in range(max_retries):
            try:
                # 尝试API调用
                result = await external_api_call(query)
                
                # 重置错误计数
                self.error_counts["api_call"] = 0
                self.last_successful_response = result
                
                return result
                
            except APITimeoutError as e:
                self.error_counts["timeout"] += 1
                if attempt < max_retries - 1:
                    delay = base_delay * (2 ** attempt)
                    logger.warning(f"API超时,{delay}秒后重试: {e}")
                    await asyncio.sleep(delay)
                else:
                    return self._handle_timeout_fallback(query)
                    
            except APIRateLimitError as e:
                self.error_counts["rate_limit"] += 1
                if attempt < max_retries - 1:
                    # 速率限制时等待更长时间
                    delay = base_delay * (3 ** attempt)
                    logger.warning(f"API速率限制,{delay}秒后重试: {e}")
                    await asyncio.sleep(delay)
                else:
                    return "抱歉,服务暂时繁忙,请稍后再试。"
                    
            except APIConnectionError as e:
                self.error_counts["connection"] += 1
                logger.error(f"API连接错误: {e}")
                return self._handle_connection_fallback(query)
                
            except Exception as e:
                self.error_counts["unknown"] += 1
                logger.error(f"未知错误: {e}", exc_info=True)
                raise ToolError(f"服务暂时不可用,请稍后再试。")
    
    def _handle_timeout_fallback(self, query: str) -> str:
        """超时降级处理"""
        if self.last_successful_response:
            return f"抱歉,服务响应较慢。基于之前的信息:{self.last_successful_response}"
        return "抱歉,服务暂时响应较慢,请稍后再试。"
    
    def _handle_connection_fallback(self, query: str) -> str:
        """连接错误降级处理"""
        return "抱歉,无法连接到外部服务。我可以基于已有知识为您提供帮助。"

2. 架构设计最佳实践

2.1 多代理系统设计

@dataclass
class RestaurantUserData:
    """
    餐厅系统用户数据结构
    
    设计原则:
    1. 数据结构清晰
    2. 状态管理明确
    3. 代理间数据共享
    4. 审计跟踪
    """
    # 基础信息
    customer_name: str | None = None
    customer_phone: str | None = None
    
    # 业务数据
    reservation_time: str | None = None
    order: list[str] | None = None
    
    # 支付信息
    customer_credit_card: str | None = None
    expense: float | None = None
    checked_out: bool | None = None
    
    # 代理管理
    agents: dict[str, Agent] = field(default_factory=dict)
    prev_agent: Agent | None = None
    
    def summarize(self) -> str:
        """数据摘要 - 用于代理间上下文传递"""
        data = {
            "customer": {
                "name": self.customer_name or "unknown",
                "phone": self.customer_phone or "unknown",
            },
            "reservation": self.reservation_time or "none",
            "order": self.order or [],
            "payment": {
                "expense": self.expense or 0,
                "checked_out": self.checked_out or False,
            }
        }
        return yaml.dump(data, default_flow_style=False)

class GreeterAgent(Agent):
    """
    迎宾代理 - 多代理系统的入口点
    
    职责:
    1. 欢迎用户
    2. 识别用户需求
    3. 路由到专门代理
    4. 收集基础信息
    """
    
    def __init__(self, userdata: RestaurantUserData):
        super().__init__(
            instructions=f"""
            你是餐厅的迎宾员。你的职责是:
            1. 热情欢迎客户
            2. 了解客户需求(预订、外卖、咨询)
            3. 收集基础信息(姓名、电话)
            4. 将客户转接给专门的服务代理
            
            当前客户信息:
            {userdata.summarize()}
            """,
        )
    
    @function_tool
    async def handoff_to_reservation(
        self, 
        context: RunContext[RestaurantUserData],
        reason: str = "客户需要预订服务"
    ) -> tuple[Agent, str]:
        """转接到预订代理"""
        userdata = context.userdata
        
        # 创建专门的预订代理
        reservation_agent = ReservationAgent(userdata)
        userdata.agents["reservation"] = reservation_agent
        userdata.prev_agent = self
        
        return reservation_agent, f"正在为您转接到预订服务。{reason}"
    
    @function_tool
    async def handoff_to_takeaway(
        self, 
        context: RunContext[RestaurantUserData],
        reason: str = "客户需要外卖服务"
    ) -> tuple[Agent, str]:
        """转接到外卖代理"""
        userdata = context.userdata
        
        takeaway_agent = TakeawayAgent(userdata)
        userdata.agents["takeaway"] = takeaway_agent
        userdata.prev_agent = self
        
        return takeaway_agent, f"正在为您转接到外卖服务。{reason}"

class ReservationAgent(Agent):
    """
    预订代理 - 处理餐桌预订业务
    
    专门职责:
    1. 收集预订信息
    2. 检查可用性
    3. 确认预订
    4. 发送确认信息
    """
    
    def __init__(self, userdata: RestaurantUserData):
        super().__init__(
            instructions=f"""
            你是餐厅的预订专员。基于以下客户信息提供专业的预订服务:
            
            {userdata.summarize()}
            
            你的职责:
            1. 确认客户预订需求(日期、时间、人数)
            2. 检查餐桌可用性
            3. 收集必要信息(姓名、电话)
            4. 确认预订并提供预订号
            """,
        )
    
    @function_tool
    async def check_availability(
        self,
        context: RunContext[RestaurantUserData],
        date: str,
        time: str,
        party_size: int
    ) -> str:
        """检查餐桌可用性"""
        # 模拟数据库查询
        available = await check_table_availability(date, time, party_size)
        
        if available:
            return f"{date} {time} 有适合 {party_size} 人的餐桌可预订。"
        else:
            # 提供替代时间
            alternatives = await get_alternative_times(date, party_size)
            return f"{date} {time} 已满,推荐时间:{', '.join(alternatives)}"
    
    @function_tool
    async def confirm_reservation(
        self,
        context: RunContext[RestaurantUserData],
        date: str,
        time: str,
        party_size: int
    ) -> str:
        """确认预订"""
        userdata = context.userdata
        
        # 生成预订号
        reservation_id = generate_reservation_id()
        
        # 保存预订信息
        userdata.reservation_time = f"{date} {time}"
        
        # 发送确认
        confirmation = await send_reservation_confirmation(
            name=userdata.customer_name,
            phone=userdata.customer_phone,
            reservation_id=reservation_id,
            date=date,
            time=time,
            party_size=party_size
        )
        
        return f"预订已确认!预订号:{reservation_id}。确认信息已发送到您的手机。"

2.2 复杂业务流程设计

class DriveThruAgent(Agent):
    """
    汽车餐厅代理 - 复杂订单处理系统
    
    设计特点:
    1. 状态机驱动
    2. 动态工具生成
    3. 数据验证
    4. 业务规则引擎
    """
    
    def __init__(self, userdata: DriveThruUserdata):
        # 动态生成指令 - 包含菜单信息
        instructions = self._build_dynamic_instructions(userdata)
        
        # 动态生成工具 - 基于可用菜单项
        tools = self._build_dynamic_tools(userdata)
        
        super().__init__(
            instructions=instructions,
            tools=tools,
        )
        
        self.userdata = userdata
    
    def _build_dynamic_instructions(self, userdata: DriveThruUserdata) -> str:
        """
        动态构建指令 - 基于当前菜单和促销信息
        
        优势:
        1. 指令始终与业务数据同步
        2. 支持动态促销和菜单更新
        3. 个性化服务体验
        """
        base_instructions = """
        你是汽车餐厅的服务员。你需要:
        1. 热情欢迎客户
        2. 介绍今日特色和促销
        3. 帮助客户完成订单
        4. 确认订单详情和价格
        5. 处理支付和取餐安排
        """
        
        # 添加菜单信息
        menu_info = ""
        for category, items in userdata.menu_items.items():
            menu_info += f"\n\n{category.upper()}菜单:\n"
            for item in items:
                menu_info += f"- {item.name} (ID: {item.id}): ${item.price}\n"
        
        # 添加促销信息
        promotions = get_current_promotions()
        if promotions:
            promo_info = "\n\n今日促销:\n"
            for promo in promotions:
                promo_info += f"- {promo.description}\n"
        else:
            promo_info = ""
        
        return base_instructions + menu_info + promo_info
    
    def _build_dynamic_tools(self, userdata: DriveThruUserdata) -> list[FunctionTool]:
        """
        动态构建工具函数 - 基于可用菜单项
        
        设计模式:
        1. 工厂模式创建工具
        2. 运行时类型检查
        3. 业务规则验证
        4. 错误处理包装
        """
        tools = []
        
        # 为每种菜单类型创建订单工具
        if userdata.combo_items:
            tools.append(self._build_combo_order_tool(userdata))
        
        if userdata.regular_items:
            tools.append(self._build_regular_order_tool(userdata))
        
        if userdata.happy_items:
            tools.append(self._build_happy_meal_tool(userdata))
        
        # 添加通用工具
        tools.extend([
            self._build_modify_order_tool(),
            self._build_checkout_tool(),
            self._build_cancel_order_tool(),
        ])
        
        return tools
    
    def _build_combo_order_tool(self, userdata: DriveThruUserdata) -> FunctionTool:
        """构建套餐订单工具"""
        available_combo_ids = {item.id for item in userdata.combo_items}
        available_drink_ids = {item.id for item in userdata.drink_items}
        available_sauce_ids = {item.id for item in userdata.sauce_items}
        
        @function_tool
        async def order_combo_meal(
            ctx: RunContext[DriveThruUserdata],
            meal_id: Annotated[str, Field(
                description="套餐ID",
                json_schema_extra={"enum": list(available_combo_ids)}
            )],
            drink_id: Annotated[str, Field(
                description="饮料ID", 
                json_schema_extra={"enum": list(available_drink_ids)}
            )],
            sauce_ids: Annotated[list[str], Field(
                description="酱料ID列表",
                json_schema_extra={"items": {"enum": list(available_sauce_ids)}}
            )] = [],
            special_requests: str = "",
        ) -> str:
            """
            套餐订单处理函数
            
            业务逻辑:
            1. 验证菜单项可用性
            2. 检查库存状态
            3. 计算价格(含促销)
            4. 更新订单状态
            5. 返回确认信息
            """
            try:
                # 验证输入
                meal_item = find_item_by_id(userdata.combo_items, meal_id)
                drink_item = find_item_by_id(userdata.drink_items, drink_id)
                sauce_items = find_items_by_id(userdata.sauce_items, sauce_ids)
                
                if not meal_item:
                    raise ToolError(f"套餐 {meal_id} 不存在")
                if not drink_item:
                    raise ToolError(f"饮料 {drink_id} 不存在")
                
                # 检查库存
                if not await check_inventory(meal_id, drink_id, sauce_ids):
                    raise ToolError("抱歉,部分商品库存不足")
                
                # 创建订单项
                combo_order = OrderedCombo(
                    meal=meal_item,
                    drink=drink_item,
                    sauces=sauce_items,
                    special_requests=special_requests,
                )
                
                # 计算价格(应用促销)
                base_price = combo_order.calculate_price()
                discount = apply_promotions(combo_order)
                final_price = base_price - discount
                
                # 更新订单
                ctx.userdata.order.add_item(combo_order)
                ctx.userdata.order.total_price += final_price
                
                # 返回确认
                confirmation = f"""
                已添加套餐:
                - {meal_item.name}
                - {drink_item.name}
                - 酱料:{', '.join(s.name for s in sauce_items)}
                """
                
                if special_requests:
                    confirmation += f"\n- 特殊要求:{special_requests}"
                
                if discount > 0:
                    confirmation += f"\n- 原价:${base_price:.2f}"
                    confirmation += f"\n- 优惠:-${discount:.2f}"
                
                confirmation += f"\n- 小计:${final_price:.2f}"
                confirmation += f"\n\n当前订单总额:${ctx.userdata.order.total_price:.2f}"
                
                return confirmation
                
            except Exception as e:
                logger.error(f"套餐订单处理失败: {e}", exc_info=True)
                raise ToolError(f"订单处理失败:{str(e)}")
        
        return order_combo_meal

3. 用户体验优化

3.1 自然对话设计

class NaturalConversationAgent(Agent):
    """
    自然对话代理 - 优化用户体验的设计模式
    
    设计原则:
    1. 对话式交互
    2. 上下文感知
    3. 个性化响应
    4. 错误容忍
    """
    
    def __init__(self):
        super().__init__(
            instructions="""
            你是一个自然、友好的AI助手。对话风格要求:
            
            1. 自然对话:
               - 使用日常对话语言,避免机械化表达
               - 适当使用语气词和连接词
               - 根据上下文调整语调
            
            2. 上下文感知:
               - 记住之前的对话内容
               - 理解隐含的意图和情感
               - 避免重复询问已知信息
            
            3. 错误处理:
               - 优雅处理模糊或不完整的输入
               - 主动澄清歧义
               - 提供有用的建议和选项
            
            4. 个性化:
               - 根据用户偏好调整回应
               - 记住用户的习惯和选择
               - 提供相关的个性化建议
            """,
        )
    
    async def on_user_turn_completed(
        self, 
        turn_ctx: ChatContext, 
        new_message: ChatMessage
    ) -> None:
        """
        用户回合完成处理 - 实现上下文感知和智能响应
        
        处理逻辑:
        1. 分析用户意图和情感
        2. 检查对话历史和模式
        3. 准备个性化响应策略
        4. 处理特殊情况(如沉默、重复等)
        """
        user_input = new_message.text_content or ""
        
        # 情感分析
        sentiment = await analyze_sentiment(user_input)
        
        # 意图识别
        intent = await classify_intent(user_input, turn_ctx.messages)
        
        # 检查对话模式
        conversation_pattern = analyze_conversation_pattern(turn_ctx.messages)
        
        # 个性化策略
        if sentiment == "frustrated":
            # 用户沮丧时的处理
            self.session.generate_reply(
                instructions="用户似乎有些沮丧,请用同理心回应并主动提供帮助。"
            )
        elif conversation_pattern == "repetitive":
            # 重复对话的处理
            self.session.generate_reply(
                instructions="用户可能在重复同样的问题,尝试用不同的方式解释或提供替代方案。"
            )
        elif intent == "unclear":
            # 意图不明确时的处理
            self.session.generate_reply(
                instructions="用户的意图不够清晰,请友善地要求澄清,并提供一些可能的选项。"
            )
        else:
            # 正常对话流程
            self.session.generate_reply()
    
    @function_tool
    async def smart_search(
        self,
        context: RunContext,
        query: str,
        search_type: Literal["web", "knowledge_base", "faq"] = "knowledge_base"
    ) -> str:
        """
        智能搜索工具 - 提供上下文感知的搜索结果
        
        特性:
        1. 多源搜索整合
        2. 结果排序和过滤
        3. 个性化推荐
        4. 相关性评分
        """
        # 扩展查询上下文
        expanded_query = await expand_query_with_context(
            query, 
            context.session.history
        )
        
        # 多源搜索
        results = await parallel_search(
            query=expanded_query,
            sources=[search_type, "related_topics"],
            user_profile=context.userdata.get("profile", {})
        )
        
        # 结果整合和排序
        ranked_results = rank_results_by_relevance(
            results, 
            query, 
            context.session.history
        )
        
        # 格式化响应
        if ranked_results:
            response = format_search_results(ranked_results[:3])  # 前3个结果
            
            # 添加相关建议
            related_queries = generate_related_queries(query, ranked_results)
            if related_queries:
                response += f"\n\n您可能还想了解:{', '.join(related_queries)}"
                
            return response
        else:
            return "抱歉,没有找到相关信息。您能提供更多详细信息吗?"

3.2 背景音频和氛围营造

async def entrypoint(ctx: JobContext):
    """
    完整的背景音频配置 - 提升用户体验
    
    背景音频功能:
    1. 营造氛围
    2. 掩盖技术噪音
    3. 提供听觉反馈
    4. 增强沉浸感
    """
    session = AgentSession(
        vad=ctx.proc.userdata["vad"],
        llm=openai.LLM(model="gpt-4o-mini"),
        stt=deepgram.STT(model="nova-3"),
        tts=openai.TTS(voice="ash"),
    )
    
    # 配置背景音频
    background_audio = BackgroundAudioPlayer(
        # 思考时的背景音
        thinking_audio=AudioConfig(
            file_path="assets/thinking-ambience.ogg",
            volume=0.3,
            loop=True,
            fade_in_duration=0.5,
            fade_out_duration=0.5,
        ),
        
        # 打字音效
        typing_audio=AudioConfig(
            file_path="assets/keyboard-typing.ogg", 
            volume=0.2,
            loop=True,
        ),
        
        # 办公室环境音
        ambient_audio=AudioConfig(
            file_path="assets/office-ambience.ogg",
            volume=0.1,
            loop=True,
            continuous=True,  # 持续播放
        ),
    )
    
    # 启动会话和背景音频
    await session.start(agent=MyAgent(), room=ctx.room)
    await background_audio.start(room=ctx.room, agent_session=session)
    
    # 背景音频会根据代理状态自动切换:
    # - listening: 播放环境音
    # - thinking: 播放思考音 + 环境音
    # - speaking: 停止所有背景音

4. 安全和隐私最佳实践

4.1 敏感数据处理

class SecureAgent(Agent):
    """
    安全代理 - 处理敏感信息的最佳实践
    
    安全措施:
    1. 数据脱敏
    2. 访问控制
    3. 审计日志
    4. 合规检查
    """
    
    def __init__(self):
        super().__init__(
            instructions="""
            你是一个安全的AI助手。处理敏感信息时:
            1. 不要在响应中显示完整的敏感信息
            2. 使用掩码显示(如:****-1234)
            3. 提醒用户注意隐私保护
            4. 遵守数据保护法规
            """,
        )
    
    @function_tool
    async def process_payment_info(
        self,
        context: RunContext,
        card_number: Annotated[str, Field(description="信用卡号码")],
        expiry_date: Annotated[str, Field(description="有效期")],
        cvv: Annotated[str, Field(description="安全码")],
    ) -> str:
        """
        安全的支付信息处理
        
        安全措施:
        1. 立即加密敏感数据
        2. 不在日志中记录原始数据
        3. 使用安全的存储方式
        4. 实现审计跟踪
        """
        # 输入验证
        if not validate_card_number(card_number):
            raise ToolError("信用卡号码格式无效")
        
        if not validate_expiry_date(expiry_date):
            raise ToolError("有效期格式无效")
        
        if not validate_cvv(cvv):
            raise ToolError("安全码格式无效")
        
        # 数据脱敏用于日志
        masked_card = mask_card_number(card_number)
        
        # 记录审计日志(不包含敏感信息)
        audit_log = {
            "action": "payment_info_processed",
            "user_id": context.userdata.get("user_id"),
            "masked_card": masked_card,
            "timestamp": datetime.utcnow().isoformat(),
            "session_id": context.session.session_id,
        }
        logger.info("支付信息处理", extra=audit_log)
        
        try:
            # 加密存储敏感信息
            encrypted_data = encrypt_payment_info({
                "card_number": card_number,
                "expiry_date": expiry_date,
                "cvv": cvv,
            })
            
            # 安全存储
            payment_token = await store_encrypted_payment_info(
                encrypted_data, 
                context.userdata.get("user_id")
            )
            
            # 处理支付
            result = await process_payment_securely(payment_token)
            
            return f"支付信息已安全处理。卡号:{masked_card},处理结果:{result}"
            
        except Exception as e:
            # 安全错误处理
            logger.error("支付处理失败", extra={
                "error": str(e),
                "masked_card": masked_card,
                "user_id": context.userdata.get("user_id"),
            })
            raise ToolError("支付处理失败,请检查信息后重试。")
        finally:
            # 清理内存中的敏感数据
            clear_sensitive_variables(card_number, cvv)

4.2 访问控制和权限管理

class RoleBasedAgent(Agent):
    """
    基于角色的访问控制代理
    
    权限模型:
    1. 角色定义
    2. 权限检查
    3. 操作审计
    4. 会话隔离
    """
    
    def __init__(self, user_role: str):
        self.user_role = user_role
        self.permissions = get_role_permissions(user_role)
        
        super().__init__(
            instructions=f"""
            你是一个具有角色权限控制的助手。
            当前用户角色:{user_role}
            可用权限:{', '.join(self.permissions)}
            
            严格按照权限执行操作,拒绝未授权的请求。
            """,
        )
    
    def require_permission(self, permission: str):
        """权限检查装饰器"""
        def decorator(func):
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                if permission not in self.permissions:
                    logger.warning(f"权限拒绝: 用户角色 {self.user_role} 尝试执行 {permission}")
                    raise ToolError(f"权限不足,需要 {permission} 权限")
                return await func(*args, **kwargs)
            return wrapper
        return decorator
    
    @function_tool
    @require_permission("read_user_data")
    async def get_user_info(
        self, 
        context: RunContext, 
        user_id: str
    ) -> str:
        """获取用户信息 - 需要读取权限"""
        user_info = await fetch_user_info(user_id)
        
        # 根据权限过滤敏感信息
        filtered_info = filter_user_info_by_permission(
            user_info, 
            self.permissions
        )
        
        return json.dumps(filtered_info, ensure_ascii=False)
    
    @function_tool
    @require_permission("admin_operations")
    async def delete_user_data(
        self, 
        context: RunContext, 
        user_id: str
    ) -> str:
        """删除用户数据 - 需要管理员权限"""
        # 额外的安全检查
        if not await verify_admin_operation(
            context.userdata.get("user_id"), 
            "delete_user_data"
        ):
            raise ToolError("管理员操作需要额外验证")
        
        # 执行删除操作
        await delete_user_data_securely(user_id)
        
        # 记录审计日志
        audit_log = {
            "action": "user_data_deleted",
            "target_user": user_id,
            "operator": context.userdata.get("user_id"),
            "timestamp": datetime.utcnow().isoformat(),
        }
        logger.info("管理员操作", extra=audit_log)
        
        return f"用户 {user_id} 的数据已安全删除"

5. 测试和质量保证

5.1 自动化测试框架

import pytest
from livekit.agents import AgentSession
from livekit.agents.voice.run_result import RunResult

class TestVoiceAgent:
    """
    语音代理自动化测试套件
    
    测试范围:
    1. 功能测试
    2. 性能测试  
    3. 错误处理测试
    4. 集成测试
    """
    
    @pytest.mark.asyncio
    async def test_basic_conversation(self):
        """基础对话测试"""
        async with AgentSession(llm=openai.LLM()) as session:
            await session.start(MyAgent())
            
            # 测试用户输入处理
            result = await session.run(
                user_input="Hello, how are you today?"
            )
            
            # 验证响应
            result.expect.next_event().is_message(role="assistant")
            await result.expect.next_event().judge(
                llm=openai.LLM(),
                intent="assistant should greet the user and ask about their needs"
            )
    
    @pytest.mark.asyncio
    async def test_tool_execution(self):
        """工具执行测试"""
        async with AgentSession(llm=openai.LLM()) as session:
            await session.start(WeatherAgent())
            
            result = await session.run(
                user_input="What's the weather like in New York?"
            )
            
            # 验证工具调用序列
            result.expect.skip_next_event_if(type="message", role="assistant")
            result.expect.next_event().is_function_call(name="get_weather")
            result.expect.next_event().is_function_call_output()
            
            # 验证最终响应
            await (
                result.expect.next_event()
                .is_message(role="assistant")
                .judge(
                    llm=openai.LLM(),
                    intent="assistant should provide weather information for New York"
                )
            )
    
    @pytest.mark.asyncio
    async def test_error_handling(self):
        """错误处理测试"""
        async with AgentSession(llm=openai.LLM()) as session:
            await session.start(RobustAgent())
            
            # 测试API错误处理
            with mock.patch('external_api_call', side_effect=APITimeoutError()):
                result = await session.run(
                    user_input="Call the external API"
                )
                
                # 验证错误恢复
                result.expect.next_event().is_message(role="assistant")
                assert "抱歉" in result.events[-1].data.content
    
    @pytest.mark.asyncio 
    async def test_performance_metrics(self):
        """性能指标测试"""
        start_time = time.time()
        
        async with AgentSession(
            llm=openai.LLM(),
            preemptive_generation=True
        ) as session:
            await session.start(MyAgent())
            
            result = await session.run(
                user_input="Tell me a short joke"
            )
            
            response_time = time.time() - start_time
            
            # 性能断言
            assert response_time < 3.0, f"响应时间过长: {response_time}s"
            
            # 验证预先生成效果
            metrics = session.get_metrics()
            assert metrics.get("preemptive_generation_used", False)
    
    @pytest.mark.asyncio
    async def test_multi_agent_handoff(self):
        """多代理切换测试"""
        userdata = RestaurantUserData()
        
        async with AgentSession(llm=openai.LLM()) as session:
            await session.start(GreeterAgent(userdata))
            
            # 测试代理切换
            result = await session.run(
                user_input="I'd like to make a reservation"
            )
            
            # 验证切换到预订代理
            result.expect.next_event().is_agent_handoff(
                to_agent_type="ReservationAgent"
            )

5.2 性能基准测试

class PerformanceBenchmark:
    """
    性能基准测试套件
    
    测试指标:
    1. 响应延迟
    2. 吞吐量
    3. 资源使用率
    4. 并发能力
    """
    
    async def benchmark_response_latency(self):
        """响应延迟基准测试"""
        latencies = []
        
        for i in range(100):
            start_time = time.time()
            
            async with AgentSession(
                llm=openai.LLM(model="gpt-4o-mini"),
                preemptive_generation=True
            ) as session:
                await session.start(MyAgent())
                await session.run(user_input="Hello")
            
            latency = time.time() - start_time
            latencies.append(latency)
        
        # 统计分析
        avg_latency = sum(latencies) / len(latencies)
        p95_latency = sorted(latencies)[95]
        p99_latency = sorted(latencies)[99]
        
        print(f"平均延迟: {avg_latency:.2f}s")
        print(f"P95延迟: {p95_latency:.2f}s") 
        print(f"P99延迟: {p99_latency:.2f}s")
        
        # 性能断言
        assert avg_latency < 2.0, f"平均延迟过高: {avg_latency}s"
        assert p95_latency < 3.0, f"P95延迟过高: {p95_latency}s"
    
    async def benchmark_concurrent_sessions(self):
        """并发会话基准测试"""
        concurrent_sessions = 10
        tasks = []
        
        for i in range(concurrent_sessions):
            task = asyncio.create_task(self._run_session(f"user_{i}"))
            tasks.append(task)
        
        start_time = time.time()
        results = await asyncio.gather(*tasks)
        total_time = time.time() - start_time
        
        # 分析结果
        successful_sessions = sum(1 for r in results if r.success)
        throughput = successful_sessions / total_time
        
        print(f"并发会话数: {concurrent_sessions}")
        print(f"成功会话数: {successful_sessions}")
        print(f"吞吐量: {throughput:.2f} sessions/s")
        
        assert successful_sessions >= concurrent_sessions * 0.95  # 95%成功率

这个最佳实践文档涵盖了LiveKit Agents框架的核心优化策略、架构设计模式、用户体验提升、安全实践和测试方法。每个部分都包含了详细的代码示例和实际应用场景,帮助开发者构建高质量的语音AI应用。