1. 模块概览

LiveKit Agents框架由多个核心模块组成,每个模块负责特定的功能领域。本文档将深入分析每个模块的架构、API和实现细节。

1.1 核心模块架构图

graph TB
    subgraph "应用层 (Application Layer)"
        Agent[Agent<br/>代理核心]
        AgentSession[AgentSession<br/>会话管理]
        JobContext[JobContext<br/>任务上下文]
    end
    
    subgraph "语音处理层 (Voice Processing Layer)"
        VoiceModule[voice模块<br/>语音处理核心]
        AgentActivity[AgentActivity<br/>代理活动管理]
        RoomIO[RoomIO<br/>房间I/O管理]
        SpeechHandle[SpeechHandle<br/>语音处理句柄]
    end
    
    subgraph "AI组件层 (AI Components Layer)"
        LLMModule[llm模块<br/>大语言模型]
        STTModule[stt模块<br/>语音转文本]
        TTSModule[tts模块<br/>文本转语音]
        VADModule[vad模块<br/>语音活动检测]
    end
    
    subgraph "基础设施层 (Infrastructure Layer)"
        IPCModule[ipc模块<br/>进程间通信]
        MetricsModule[metrics模块<br/>指标收集]
        UtilsModule[utils模块<br/>工具函数]
        CLIModule[cli模块<br/>命令行接口]
    end
    
    subgraph "推理服务层 (Inference Layer)"
        InferenceModule[inference模块<br/>推理服务抽象]
        InferenceRunner[InferenceRunner<br/>推理执行器]
    end
    
    Agent --> VoiceModule
    AgentSession --> AgentActivity
    AgentActivity --> RoomIO
    VoiceModule --> LLMModule
    VoiceModule --> STTModule
    VoiceModule --> TTSModule
    VoiceModule --> VADModule
    
    AgentSession --> IPCModule
    JobContext --> MetricsModule
    
    LLMModule --> InferenceModule
    STTModule --> InferenceModule
    TTSModule --> InferenceModule
    
    CLIModule --> JobContext
    InferenceModule --> InferenceRunner

2. voice 模块 - 语音处理核心

2.1 模块架构

voice模块是LiveKit Agents的核心,负责协调语音输入、处理和输出的整个流程。

graph TB
    subgraph "voice模块架构"
        subgraph "核心组件"
            Agent[Agent<br/>代理定义]
            AgentSession[AgentSession<br/>会话管理器]
            AgentActivity[AgentActivity<br/>活动协调器]
        end
        
        subgraph "I/O管理"
            RoomIO[RoomIO<br/>房间I/O]
            AudioInput[AudioInput<br/>音频输入]
            AudioOutput[AudioOutput<br/>音频输出]
            VideoInput[VideoInput<br/>视频输入]
        end
        
        subgraph "语音处理"
            SpeechHandle[SpeechHandle<br/>语音句柄]
            Transcription[Transcription<br/>转录处理]
            BackgroundAudio[BackgroundAudio<br/>背景音频]
        end
        
        subgraph "事件系统"
            Events[Events<br/>事件定义]
            RunContext[RunContext<br/>运行上下文]
        end
    end
    
    AgentSession --> AgentActivity
    AgentActivity --> SpeechHandle
    AgentSession --> RoomIO
    RoomIO --> AudioInput
    RoomIO --> AudioOutput
    RoomIO --> VideoInput
    
    AgentActivity --> Transcription
    AgentSession --> BackgroundAudio
    AgentActivity --> Events
    Events --> RunContext

2.2 AgentSession 详细分析

2.2.1 核心属性和状态管理

class AgentSession(rtc.EventEmitter[EventTypes], Generic[Userdata_T]):
    """
    AgentSession是语音代理的运行时核心,负责协调所有组件
    
    核心职责:
    1. 管理音频/视频/文本I/O流
    2. 协调STT、VAD、TTS、LLM组件
    3. 处理转换检测和端点检测
    4. 管理中断和多步工具调用
    5. 维护会话状态和用户数据
    """
    
    def __init__(self, ...):
        # 核心组件引用
        self._stt: stt.STT | None = None              # 语音转文本组件
        self._vad: vad.VAD | None = None              # 语音活动检测组件  
        self._llm: llm.LLM | None = None              # 大语言模型组件
        self._tts: tts.TTS | None = None              # 文本转语音组件
        self._mcp_servers: list[mcp.MCPServer] | None = None  # MCP服务器列表
        
        # 错误计数器
        self._llm_error_counts = 0                    # LLM错误计数
        self._tts_error_counts = 0                    # TTS错误计数
        
        # I/O管理
        self._input = io.AgentInput(                  # 代理输入管理器
            self._on_video_input_changed, 
            self._on_audio_input_changed
        )
        self._output = io.AgentOutput(                # 代理输出管理器
            self._on_video_output_changed,
            self._on_audio_output_changed, 
            self._on_text_output_changed
        )
        
        # 异步任务管理
        self._forward_audio_atask: asyncio.Task[None] | None = None
        self._forward_video_atask: asyncio.Task[None] | None = None
        self._update_activity_atask: asyncio.Task[None] | None = None
        
        # 同步锁
        self._activity_lock = asyncio.Lock()          # 活动锁
        self._lock = asyncio.Lock()                   # 主锁
        
        # 核心状态
        self._agent: Agent | None = None              # 当前代理
        self._activity: AgentActivity | None = None   # 当前活动
        self._next_activity: AgentActivity | None = None  # 下一个活动
        self._user_state: UserState = "listening"    # 用户状态
        self._agent_state: AgentState = "initializing"  # 代理状态
        
        # 用户数据和会话管理
        self._userdata: Userdata_T | None = userdata
        self._closing_task: asyncio.Task[None] | None = None
        self._closing: bool = False
        
        # 跟踪和监控
        self._user_speaking_span: trace.Span | None = None
        self._agent_speaking_span: trace.Span | None = None
        self._session_span: trace.Span | None = None

2.2.2 会话启动流程

class AgentSession:
    async def start(
        self,
        agent: Agent,
        *,
        capture_run: bool = False,
        room: NotGivenOr[rtc.Room] = NOT_GIVEN,
        room_input_options: NotGivenOr[room_io.RoomInputOptions] = NOT_GIVEN,
        room_output_options: NotGivenOr[room_io.RoomOutputOptions] = NOT_GIVEN,
    ) -> None | RunResult:
        """
        启动代理会话的主要流程
        
        启动步骤:
        1. 验证和设置代理
        2. 初始化房间I/O
        3. 创建代理活动
        4. 启动音视频转发任务
        5. 调用代理进入回调
        
        参数说明:
        - agent: 要运行的代理实例
        - capture_run: 是否捕获运行结果(用于测试)
        - room: LiveKit房间对象
        - room_input_options: 房间输入配置
        - room_output_options: 房间输出配置
        """
        
        # 1. 设置代理和基础状态
        self._agent = agent
        self._update_agent_state("initializing")
        
        # 2. 处理房间连接
        if is_given(room):
            self._room_io = room_io.RoomIO(room, self)
            await self._room_io.start(room_input_options, room_output_options)
        
        # 3. 创建代理活动管理器
        async with self._activity_lock:
            self._activity = AgentActivity(agent, self)
            
        # 4. 启动音视频转发任务
        self._forward_audio_atask = asyncio.create_task(
            self._forward_audio_task()
        )
        self._forward_video_atask = asyncio.create_task(
            self._forward_video_task()
        )
        self._update_activity_atask = asyncio.create_task(
            self._update_activity_task()
        )
        
        # 5. 更新状态并调用代理回调
        self._update_agent_state("listening")
        
        # 调用代理进入回调
        if hasattr(agent, 'on_enter') and callable(agent.on_enter):
            await agent.on_enter()
        
        # 6. 处理测试模式
        if capture_run:
            return self._global_run_state or RunResult()

2.2.3 语音处理管道

class AgentSession:
    async def _forward_audio_task(self) -> None:
        """
        音频转发任务 - 处理音频流的核心循环
        
        处理流程:
        1. 监听音频输入变化
        2. 连接STT和VAD组件
        3. 处理语音识别结果
        4. 触发代理响应生成
        """
        while not self._closing:
            try:
                # 等待音频输入变化
                audio_input = await self._input.wait_for_audio_input()
                
                if audio_input is None:
                    continue
                
                # 连接语音处理组件
                if self._stt:
                    await self._connect_stt_pipeline(audio_input)
                
                if self._vad:
                    await self._connect_vad_pipeline(audio_input)
                    
            except Exception as e:
                logger.error(f"音频转发任务错误: {e}")
                await asyncio.sleep(0.1)
    
    async def _connect_stt_pipeline(self, audio_input: AudioInput) -> None:
        """
        连接STT处理管道
        
        功能:
        1. 将音频流连接到STT组件
        2. 处理转录结果
        3. 触发用户输入事件
        """
        stt_stream = await self._stt.stream()
        
        async def _forward_audio():
            async for audio_frame in audio_input:
                await stt_stream.push_frame(audio_frame)
        
        async def _handle_transcription():
            async for event in stt_stream:
                if isinstance(event, stt.SpeechEvent):
                    if event.type == stt.SpeechEventType.FINAL_TRANSCRIPT:
                        # 发送用户输入转录事件
                        self.emit("user_input_transcribed", 
                                UserInputTranscribedEvent(
                                    text=event.alternatives[0].text,
                                    confidence=event.alternatives[0].confidence
                                ))
                        
                        # 触发代理响应
                        if self._activity:
                            await self._activity.handle_user_input(event.alternatives[0].text)
        
        # 启动并发任务
        await asyncio.gather(
            _forward_audio(),
            _handle_transcription()
        )

2.3 AgentActivity - 活动协调器

2.3.1 核心功能

class AgentActivity(RecognitionHooks):
    """
    AgentActivity协调单个代理的所有活动
    
    主要职责:
    1. 管理代理的生命周期
    2. 协调LLM调用和工具执行
    3. 处理语音合成和播放
    4. 管理对话上下文
    5. 处理中断和错误恢复
    """
    
    def __init__(self, agent: Agent, sess: AgentSession) -> None:
        self._agent = agent                           # 关联的代理
        self._session = sess                          # 关联的会话
        self._chat_ctx = agent.chat_ctx.copy()       # 对话上下文副本
        self._current_speech: SpeechHandle | None = None  # 当前语音句柄
        self._pending_logs: list[str] = []           # 待处理日志
        
        # 状态管理
        self._generating = False                      # 是否正在生成响应
        self._tool_calling = False                   # 是否正在调用工具
        self._interrupted = False                    # 是否被中断
        
        # 任务管理
        self._reply_task: asyncio.Task | None = None # 回复生成任务
        self._tool_tasks: set[asyncio.Task] = set()  # 工具执行任务集

2.3.2 响应生成流程

class AgentActivity:
    async def _pipeline_reply_task(
        self,
        *,
        speech_handle: SpeechHandle,
        chat_ctx: llm.ChatContext,
        tools: list[llm.FunctionTool | llm.RawFunctionTool],
        model_settings: ModelSettings,
        new_message: llm.ChatMessage | None = None,
        instructions: str | None = None,
    ) -> None:
        """
        响应生成管道 - 处理完整的响应生成流程
        
        流程步骤:
        1. 准备LLM请求上下文
        2. 调用LLM生成响应
        3. 处理工具调用
        4. 执行TTS合成
        5. 播放音频输出
        6. 更新对话历史
        """
        
        # 1. 准备请求上下文
        if new_message:
            chat_ctx = chat_ctx.append(
                role=new_message.role,
                content=new_message.content
            )
        
        # 2. 调用LLM生成响应
        self._session._update_agent_state("thinking")
        
        try:
            llm_stream = await self._agent.llm_node(
                chat_ctx=chat_ctx,
                tools=tools,
                model_settings=model_settings
            )
            
            # 3. 处理流式响应
            response_text = ""
            tool_calls = []
            
            async for chunk in llm_stream:
                if speech_handle.interrupted:
                    break
                
                if isinstance(chunk, llm.ChatChunk):
                    if chunk.choices:
                        choice = chunk.choices[0]
                        if choice.delta.content:
                            response_text += choice.delta.content
                            
                        if choice.delta.tool_calls:
                            tool_calls.extend(choice.delta.tool_calls)
            
            # 4. 执行工具调用
            if tool_calls and not speech_handle.interrupted:
                tool_results = await self._execute_tools(tool_calls)
                
                # 更新聊天上下文
                for call, result in zip(tool_calls, tool_results):
                    chat_ctx = chat_ctx.append(
                        role="assistant",
                        tool_calls=[call]
                    ).append(
                        role="tool",
                        content=result,
                        tool_call_id=call.id
                    )
                
                # 递归生成最终响应
                return await self._pipeline_reply_task(
                    speech_handle=speech_handle,
                    chat_ctx=chat_ctx,
                    tools=tools,
                    model_settings=model_settings
                )
            
            # 5. TTS合成和播放
            if response_text and not speech_handle.interrupted:
                await self._synthesize_and_play(response_text, speech_handle)
            
            # 6. 更新对话历史
            self._chat_ctx = chat_ctx.append(
                role="assistant",
                content=response_text
            )
            
        except Exception as e:
            logger.error(f"响应生成失败: {e}")
            await self._handle_generation_error(e, speech_handle)
        finally:
            self._session._update_agent_state("listening")
    
    async def _execute_tools(
        self, 
        tool_calls: list[llm.FunctionCall]
    ) -> list[str]:
        """
        执行工具调用
        
        参数:
        - tool_calls: 要执行的工具调用列表
        
        返回值:
        - list[str]: 工具执行结果列表
        """
        results = []
        
        for call in tool_calls:
            try:
                # 查找对应的工具函数
                tool_fn = self._find_tool_function(call.function.name)
                if not tool_fn:
                    results.append(f"工具 {call.function.name} 未找到")
                    continue
                
                # 解析参数
                args = json.loads(call.function.arguments)
                
                # 执行工具函数
                result = await tool_fn(**args)
                results.append(str(result) if result is not None else "")
                
            except Exception as e:
                logger.error(f"工具执行失败 {call.function.name}: {e}")
                results.append(f"工具执行失败: {e}")
        
        return results
    
    async def _synthesize_and_play(
        self, 
        text: str, 
        speech_handle: SpeechHandle
    ) -> None:
        """
        合成并播放语音
        
        参数:
        - text: 要合成的文本
        - speech_handle: 语音处理句柄
        """
        if not self._session._tts:
            return
        
        try:
            # 开始TTS合成
            tts_stream = await self._session._tts.synthesize(text)
            
            # 更新代理状态
            self._session._update_agent_state("speaking")
            
            # 播放音频
            async for audio_frame in tts_stream:
                if speech_handle.interrupted:
                    break
                
                # 发送到音频输出
                await self._session._output.audio_output.push_frame(audio_frame)
            
        except Exception as e:
            logger.error(f"TTS合成失败: {e}")
            raise
        finally:
            self._session._update_agent_state("listening")

2.4 RoomIO - 房间I/O管理

2.4.1 输入输出配置

@dataclass
class RoomInputOptions:
    """
    房间输入配置选项
    
    配置项说明:
    - text_enabled: 是否启用文本输入
    - audio_enabled: 是否启用音频输入
    - video_enabled: 是否启用视频输入
    - audio_sample_rate: 音频采样率
    - audio_num_channels: 音频通道数
    - noise_cancellation: 噪音消除配置
    - participant_kinds: 接受的参与者类型
    - pre_connect_audio: 预连接音频功能
    """
    text_enabled: NotGivenOr[bool] = NOT_GIVEN
    audio_enabled: NotGivenOr[bool] = NOT_GIVEN
    video_enabled: NotGivenOr[bool] = NOT_GIVEN
    audio_sample_rate: int = 24000
    audio_num_channels: int = 1
    noise_cancellation: rtc.NoiseCancellationOptions | None = None
    text_input_cb: TextInputCallback = _default_text_input_cb
    participant_kinds: NotGivenOr[list[rtc.ParticipantKind.ValueType]] = NOT_GIVEN
    participant_identity: NotGivenOr[str] = NOT_GIVEN
    pre_connect_audio: bool = True
    pre_connect_audio_timeout: float = 3.0
    close_on_disconnect: bool = True
    delete_room_on_close: bool = True

@dataclass 
class RoomOutputOptions:
    """
    房间输出配置选项
    
    配置项说明:
    - transcription_enabled: 是否启用转录输出
    - audio_enabled: 是否启用音频输出
    - audio_sample_rate: 音频采样率
    - audio_publish_options: 音频发布选项
    """
    transcription_enabled: NotGivenOr[bool] = NOT_GIVEN
    audio_enabled: NotGivenOr[bool] = NOT_GIVEN
    audio_sample_rate: int = 24000
    audio_num_channels: int = 1
    audio_publish_options: rtc.TrackPublishOptions = field(
        default_factory=lambda: rtc.TrackPublishOptions(
            source=rtc.TrackSource.SOURCE_MICROPHONE
        )
    )

2.4.2 房间I/O实现

class RoomIO:
    """
    RoomIO管理与LiveKit房间的音视频I/O连接
    
    主要功能:
    1. 管理参与者连接和断开
    2. 处理音频/视频流的订阅和发布
    3. 协调输入输出流的路由
    4. 处理房间事件和状态变化
    """
    
    def __init__(self, room: rtc.Room, session: AgentSession) -> None:
        self._room = room
        self._session = session
        self._participant: rtc.RemoteParticipant | None = None
        self._audio_input: AudioInput | None = None
        self._video_input: VideoInput | None = None
        self._audio_output: AudioOutput | None = None
        self._text_output: TextOutput | None = None
        
        # 流管理
        self._audio_streams: dict[str, _ParticipantAudioInputStream] = {}
        self._video_streams: dict[str, _ParticipantVideoInputStream] = {}
        
        # 预连接音频处理
        self._pre_connect_handler: PreConnectAudioHandler | None = None
    
    async def start(
        self,
        input_options: RoomInputOptions | None = None,
        output_options: RoomOutputOptions | None = None,
    ) -> None:
        """
        启动房间I/O管理
        
        启动流程:
        1. 设置房间事件监听
        2. 配置输入输出选项
        3. 查找并连接目标参与者
        4. 启动音视频流处理
        """
        
        # 1. 设置默认选项
        input_options = input_options or RoomInputOptions()
        output_options = output_options or RoomOutputOptions()
        
        # 2. 注册房间事件监听器
        self._room.on("participant_connected", self._on_participant_connected)
        self._room.on("participant_disconnected", self._on_participant_disconnected)
        self._room.on("track_published", self._on_track_published)
        self._room.on("track_unpublished", self._on_track_unpublished)
        self._room.on("track_subscribed", self._on_track_subscribed)
        self._room.on("track_unsubscribed", self._on_track_unsubscribed)
        
        # 3. 查找目标参与者
        await self._find_and_link_participant(input_options)
        
        # 4. 设置输入输出流
        await self._setup_input_streams(input_options)
        await self._setup_output_streams(output_options)
        
        # 5. 启动预连接音频处理
        if input_options.pre_connect_audio:
            self._pre_connect_handler = PreConnectAudioHandler(
                room=self._room,
                timeout=input_options.pre_connect_audio_timeout
            )
            await self._pre_connect_handler.start()
    
    async def _find_and_link_participant(
        self, 
        options: RoomInputOptions
    ) -> None:
        """
        查找并链接目标参与者
        
        查找策略:
        1. 优先使用指定身份的参与者
        2. 按参与者类型过滤
        3. 选择第一个符合条件的参与者
        """
        participant_kinds = (
            options.participant_kinds 
            if is_given(options.participant_kinds) 
            else DEFAULT_PARTICIPANT_KINDS
        )
        
        # 查找现有参与者
        for participant in self._room.remote_participants.values():
            if participant.kind not in participant_kinds:
                continue
                
            if is_given(options.participant_identity):
                if participant.identity == options.participant_identity:
                    await self._link_participant(participant)
                    return
            else:
                await self._link_participant(participant)
                return
        
        # 如果没有找到,等待新参与者加入
        if not self._participant:
            logger.info("等待参与者加入房间...")
    
    async def _link_participant(self, participant: rtc.RemoteParticipant) -> None:
        """
        链接到特定参与者
        
        功能:
        1. 设置参与者引用
        2. 订阅音视频轨道
        3. 启动流处理
        """
        self._participant = participant
        logger.info(f"链接到参与者: {participant.identity}")
        
        # 订阅现有轨道
        for track_pub in participant.track_publications.values():
            if track_pub.track is not None:
                await self._handle_track_subscription(track_pub.track, participant)
    
    async def _setup_input_streams(self, options: RoomInputOptions) -> None:
        """设置输入流处理"""
        if is_given(options.audio_enabled) and options.audio_enabled:
            self._audio_input = AudioInput()
            self._session._input.set_audio_input(self._audio_input)
        
        if is_given(options.video_enabled) and options.video_enabled:
            self._video_input = VideoInput()
            self._session._input.set_video_input(self._video_input)
    
    async def _setup_output_streams(self, options: RoomOutputOptions) -> None:
        """设置输出流处理"""
        if is_given(options.audio_enabled) and options.audio_enabled:
            self._audio_output = _ParticipantAudioOutput(
                room=self._room,
                sample_rate=options.audio_sample_rate,
                num_channels=options.audio_num_channels,
                publish_options=options.audio_publish_options
            )
            self._session._output.set_audio_output(self._audio_output)
        
        if is_given(options.transcription_enabled) and options.transcription_enabled:
            self._text_output = _ParticipantTranscriptionOutput(
                room=self._room
            )
            self._session._output.set_text_output(self._text_output)

3. 语音处理时序图

3.1 完整对话流程

sequenceDiagram
    participant User as 用户
    participant RoomIO as RoomIO
    participant AgentSession as AgentSession  
    participant AgentActivity as AgentActivity
    participant STT as STT组件
    participant LLM as LLM组件
    participant Tools as 工具函数
    participant TTS as TTS组件
    
    User->>RoomIO: 开始说话
    RoomIO->>AgentSession: 音频流输入
    AgentSession->>STT: 转发音频帧
    STT->>AgentSession: 返回转录文本
    AgentSession->>AgentActivity: 触发用户输入事件
    
    AgentActivity->>LLM: 发送聊天上下文
    LLM->>AgentActivity: 返回流式响应
    
    alt 包含工具调用
        AgentActivity->>Tools: 执行工具函数
        Tools->>AgentActivity: 返回执行结果
        AgentActivity->>LLM: 发送工具结果
        LLM->>AgentActivity: 返回最终响应
    end
    
    AgentActivity->>TTS: 发送响应文本
    TTS->>AgentActivity: 返回音频流
    AgentActivity->>RoomIO: 转发音频输出
    RoomIO->>User: 播放AI回复

3.2 中断处理流程

sequenceDiagram
    participant User as 用户
    participant VAD as VAD组件
    participant AgentSession as AgentSession
    participant SpeechHandle as SpeechHandle
    participant TTS as TTS组件
    
    Note over AgentSession: 代理正在说话
    User->>VAD: 开始说话(中断)
    VAD->>AgentSession: 检测到语音活动
    AgentSession->>SpeechHandle: 设置中断标志
    SpeechHandle->>TTS: 停止当前合成
    TTS->>SpeechHandle: 确认停止
    SpeechHandle->>AgentSession: 中断完成
    AgentSession->>AgentSession: 开始处理新输入

4. 关键数据结构

4.1 语音处理状态

class UserState(str, Enum):
    """用户状态枚举"""
    LISTENING = "listening"      # 监听中
    SPEAKING = "speaking"        # 说话中
    AWAY = "away"               # 离开

class AgentState(str, Enum):
    """代理状态枚举"""
    INITIALIZING = "initializing"  # 初始化中
    LISTENING = "listening"        # 监听中
    THINKING = "thinking"          # 思考中
    SPEAKING = "speaking"          # 说话中
    TOOL_CALLING = "tool_calling"  # 工具调用中

@dataclass
class VoiceOptions:
    """语音处理选项"""
    allow_interruptions: bool = True
    discard_audio_if_uninterruptible: bool = True
    min_interruption_duration: float = 0.5
    min_interruption_words: int = 0
    min_endpointing_delay: float = 0.5
    max_endpointing_delay: float = 6.0
    max_tool_steps: int = 3
    user_away_timeout: float | None = 15.0
    false_interruption_timeout: float | None = 2.0
    resume_false_interruption: bool = True
    min_consecutive_speech_delay: float = 0.0
    preemptive_generation: bool = False

4.2 事件系统

@dataclass
class UserInputTranscribedEvent:
    """用户输入转录事件"""
    text: str                    # 转录文本
    confidence: float           # 置信度
    is_final: bool = True       # 是否为最终结果

@dataclass  
class AgentStateChangedEvent:
    """代理状态变化事件"""
    previous_state: AgentState  # 之前状态
    current_state: AgentState   # 当前状态
    
@dataclass
class SpeechCreatedEvent:
    """语音创建事件"""
    speech_handle: SpeechHandle # 语音句柄
    source: str                 # 来源描述

@dataclass
class MetricsCollectedEvent:
    """指标收集事件"""
    metrics: dict[str, Any]     # 指标数据
    timestamp: float            # 时间戳

5. 性能优化特性

5.1 预先生成(Preemptive Generation)

class AgentActivity:
    async def _handle_preemptive_generation(
        self,
        user_input: str,
        chat_ctx: llm.ChatContext
    ) -> None:
        """
        预先生成功能实现
        
        优化策略:
        1. 在用户说话时就开始LLM推理
        2. 与音频处理并行执行
        3. 减少响应延迟
        4. 处理中断和取消逻辑
        """
        if not self._session._opts.preemptive_generation:
            return
        
        # 创建预先生成任务
        preemptive_task = asyncio.create_task(
            self._generate_preemptive_response(user_input, chat_ctx)
        )
        
        # 监听中断信号
        try:
            result = await preemptive_task
            if not self._interrupted:
                # 使用预先生成的结果
                await self._use_preemptive_result(result)
        except asyncio.CancelledError:
            logger.debug("预先生成被取消")

5.2 错误恢复机制

class AgentActivity:
    async def _handle_generation_error(
        self,
        error: Exception,
        speech_handle: SpeechHandle
    ) -> None:
        """
        错误恢复处理
        
        恢复策略:
        1. 区分可恢复和不可恢复错误
        2. 实现指数退避重试
        3. 错误计数和熔断机制
        4. 优雅降级处理
        """
        if isinstance(error, llm.LLMError):
            self._session._llm_error_counts += 1
            
            if error.recoverable and self._session._llm_error_counts < 3:
                # 可恢复错误,重试
                await asyncio.sleep(2 ** self._session._llm_error_counts)
                logger.warning(f"LLM错误,重试中: {error}")
                return
            else:
                # 不可恢复错误或超过重试次数
                logger.error(f"LLM错误,停止生成: {error}")
                speech_handle.interrupt()
        
        elif isinstance(error, tts.TTSError):
            self._session._tts_error_counts += 1
            logger.error(f"TTS错误: {error}")
            # TTS错误时可以降级到文本输出
            await self._fallback_to_text_output(speech_handle)

这个核心模块分析文档详细介绍了LiveKit Agents框架中最重要的voice模块的架构和实现。每个组件都包含了详细的代码实现、处理流程和优化策略,帮助开发者深入理解框架的核心工作原理。