OpenAIAgent-09-Streaming
模块概览
1. 模块职责与边界
Streaming 模块是 OpenAI Agents Python SDK 的实时数据流核心,负责管理代理执行过程中的流式数据传输和事件处理。该模块通过事件驱动的架构提供低延迟的实时响应能力,支持流式输出、增量更新和实时交互等场景。
核心职责
- 流式事件管理:处理代理执行过程中的各种流式事件
- 实时数据传输:提供低延迟的数据流传输机制
- 事件类型分发:区分和路由不同类型的流式事件
- 增量内容处理:支持增量式的内容生成和传输
- 状态同步:维护流式处理过程中的状态一致性
- 错误处理:处理流式传输中的异常和中断情况
流式事件体系
| 事件类型 | 事件类 | 触发时机 | 主要用途 |
|---|---|---|---|
| 原始响应事件 | RawResponsesStreamEvent |
LLM返回原始流式数据 | 底层数据流传输 |
| 运行项事件 | RunItemStreamEvent |
代理处理生成RunItem | 结构化内容更新 |
| 代理更新事件 | AgentUpdatedStreamEvent |
代理切换或更新 | 代理状态变更通知 |
| 实时会话事件 | RealtimeSessionEvent |
实时会话状态变化 | 实时交互管理 |
| 语音流事件 | VoiceStreamEvent |
语音处理流程 | 语音交互处理 |
流式数据类型
| 数据类型 | 对应场景 | 关键信息 | 处理方式 |
|---|---|---|---|
message_output_created |
消息输出生成 | 文本内容、角色信息 | 增量文本渲染 |
tool_called |
工具调用请求 | 工具名称、参数 | 工具执行状态更新 |
tool_output |
工具执行结果 | 返回值、状态 | 结果展示和处理 |
handoff_requested |
代理交接请求 | 目标代理、原因 | 交接流程管理 |
handoff_occured |
代理交接完成 | 交接结果 | 状态同步更新 |
输入输出接口
输入:
- 模型流式响应(
TResponseStreamEvent) - 代理运行项目(
RunItem) - 代理实例更新(
Agent) - 配置参数(队列大小、超时等)
输出:
- 统一流式事件(
StreamEvent) - 实时状态更新
- 错误和异常通知
- 完成状态信号
上下游依赖关系
上游调用者:
Runner.run_streamed():流式执行的主入口RunImpl:执行引擎的流式事件生成RealtimeSession:实时会话的事件管理VoicePipeline:语音处理的流式输出
下游依赖:
asyncio.Queue:异步队列用于事件传输items:运行项目的数据结构agent:代理实例和状态exceptions:异常处理和错误传播
2. 模块架构图
flowchart TB
subgraph "Streaming 流式处理模块"
subgraph "核心事件类型"
RAWRESPONSE[RawResponsesStreamEvent]
RUNITEM[RunItemStreamEvent]
AGENTUPDATE[AgentUpdatedStreamEvent]
STREAMEVENT[StreamEvent 联合类型]
end
subgraph "运行项事件"
MESSAGEOUTPUT[message_output_created]
TOOLCALLED[tool_called]
TOOLOUTPUT[tool_output]
HANDOFFREQ[handoff_requested]
HANDOFFOCCUR[handoff_occured]
REASONING[reasoning_item_created]
MCPAPPROVAL[mcp_approval_requested]
MCPTOOLS[mcp_list_tools]
end
subgraph "实时会话事件"
REALTIMESESSION[RealtimeSessionEvent]
AGENTSTART[RealtimeAgentStartEvent]
AGENTEND[RealtimeAgentEndEvent]
HANDOFFEVENT[RealtimeHandoffEvent]
TOOLSTART[RealtimeToolStart]
TOOLEND[RealtimeToolEnd]
AUDIOEVENTS[RealtimeAudio*]
ERROREVENTS[RealtimeError]
end
subgraph "语音流事件"
VOICESTREAM[VoiceStreamEvent]
VOICEAUDIO[VoiceStreamEventAudio]
VOICELIFECYCLE[VoiceStreamEventLifecycle]
VOICEERROR[VoiceStreamEventError]
end
subgraph "流式处理核心"
QUEUE[asyncio.Queue]
STREAMPROCESSOR[流式事件处理器]
EVENTDISPATCHER[事件分发器]
STATEMANAGER[状态管理器]
end
subgraph "数据转换"
ITEMTOEVENT[RunItem到Event转换]
RESPONSEFILER[响应过滤器]
EVENTSERIALIZER[事件序列化]
end
subgraph "生命周期管理"
QUEUESENTINEL[QueueCompleteSentinel]
STREAMCOMPLETE[流式完成信号]
ERRORHANDLING[错误处理]
CLEANUP[资源清理]
end
end
subgraph "执行集成"
RUNNER[Runner.run_streamed()]
RUNIMPL[RunImpl 执行引擎]
REALTIMESESS[RealtimeSession 实时会话]
VOICEPIPELINE[VoicePipeline 语音处理]
end
subgraph "数据源"
MODEL[Model 模型响应]
LLMSTREAM[LLM 流式输出]
AGENT[Agent 代理状态]
TOOLS[Tools 工具执行]
HANDOFFS[Handoffs 代理交接]
end
subgraph "消费者"
WEBAPP[Web 应用前端]
CHATUI[聊天界面]
VOICEUI[语音界面]
MONITORING[监控系统]
LOGGING[日志系统]
end
STREAMEVENT --> RAWRESPONSE
STREAMEVENT --> RUNITEM
STREAMEVENT --> AGENTUPDATE
RUNITEM --> MESSAGEOUTPUT
RUNITEM --> TOOLCALLED
RUNITEM --> TOOLOUTPUT
RUNITEM --> HANDOFFREQ
RUNITEM --> HANDOFFOCCUR
RUNITEM --> REASONING
RUNITEM --> MCPAPPROVAL
RUNITEM --> MCPTOOLS
REALTIMESESSION --> AGENTSTART
REALTIMESESSION --> AGENTEND
REALTIMESESSION --> HANDOFFEVENT
REALTIMESESSION --> TOOLSTART
REALTIMESESSION --> TOOLEND
REALTIMESESSION --> AUDIOEVENTS
REALTIMESESSION --> ERROREVENTS
VOICESTREAM --> VOICEAUDIO
VOICESTREAM --> VOICELIFECYCLE
VOICESTREAM --> VOICEERROR
STREAMPROCESSOR --> QUEUE
STREAMPROCESSOR --> EVENTDISPATCHER
STREAMPROCESSOR --> STATEMANAGER
ITEMTOEVENT --> RESPONSEFILER
ITEMTOEVENT --> EVENTSERIALIZER
STREAMPROCESSOR --> QUEUESENTINEL
STREAMPROCESSOR --> STREAMCOMPLETE
STREAMPROCESSOR --> ERRORHANDLING
STREAMPROCESSOR --> CLEANUP
RUNNER --> STREAMPROCESSOR
RUNIMPL --> ITEMTOEVENT
REALTIMESESS --> REALTIMESESSION
VOICEPIPELINE --> VOICESTREAM
MODEL --> RAWRESPONSE
LLMSTREAM --> RAWRESPONSE
AGENT --> AGENTUPDATE
TOOLS --> TOOLCALLED
TOOLS --> TOOLOUTPUT
HANDOFFS --> HANDOFFREQ
HANDOFFS --> HANDOFFOCCUR
EVENTDISPATCHER --> WEBAPP
EVENTDISPATCHER --> CHATUI
EVENTDISPATCHER --> VOICEUI
STREAMPROCESSOR --> MONITORING
STREAMPROCESSOR --> LOGGING
style STREAMEVENT fill:#e1f5fe
style STREAMPROCESSOR fill:#f3e5f5
style QUEUE fill:#e8f5e8
style ITEMTOEVENT fill:#fff3e0
style REALTIMESESSION fill:#ffebee
架构说明:
分层事件处理设计
- 事件类型层:定义不同类型的流式事件和其数据结构
- 事件处理层:负责事件的生成、转换和分发
- 队列传输层:基于异步队列的高效数据传输
- 状态管理层:维护流式处理过程中的状态一致性
事件分类体系
- 原始事件:直接来自LLM的原始流式数据
- 结构化事件:经过处理的结构化运行项事件
- 状态事件:代理状态变更和生命周期事件
- 实时事件:实时会话和语音交互的专用事件
异步队列机制
- 生产者模式:执行引擎产生流式事件放入队列
- 消费者模式:应用层从队列消费事件进行处理
- 背压控制:通过队列大小限制控制内存使用
- 完成信号:通过哨兵对象标识流式处理完成
实时性保障
- 低延迟传输:异步队列确保事件的快速传递
- 增量更新:支持增量式内容生成和展示
- 状态同步:实时同步代理和工具的执行状态
- 错误恢复:流式处理中的错误处理和恢复机制
3. 关键算法与流程剖析
3.1 流式事件生成算法
class RunImpl:
"""执行引擎中的流式事件生成逻辑"""
@classmethod
def stream_step_items_to_queue(
cls,
new_step_items: list[RunItem],
queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
):
"""将运行步骤项目转换为流式事件并放入队列"""
for item in new_step_items:
# 1) 根据运行项类型创建相应的流式事件
event = None
if isinstance(item, MessageOutputItem):
# 消息输出事件:文本生成完成
event = RunItemStreamEvent(
item=item,
name="message_output_created"
)
elif isinstance(item, ToolCallItem):
# 工具调用事件:工具被调用
event = RunItemStreamEvent(
item=item,
name="tool_called"
)
elif isinstance(item, ToolCallOutputItem):
# 工具输出事件:工具执行完成
event = RunItemStreamEvent(
item=item,
name="tool_output"
)
elif isinstance(item, HandoffCallItem):
# 交接请求事件:代理交接被请求
event = RunItemStreamEvent(
item=item,
name="handoff_requested"
)
elif isinstance(item, HandoffOutputItem):
# 交接完成事件:代理交接已完成
event = RunItemStreamEvent(
item=item,
name="handoff_occured" # 注意:拼写错误但为兼容性保留
)
elif isinstance(item, ReasoningItem):
# 推理项事件:推理过程生成
event = RunItemStreamEvent(
item=item,
name="reasoning_item_created"
)
elif isinstance(item, MCPApprovalRequestItem):
# MCP审批请求事件
event = RunItemStreamEvent(
item=item,
name="mcp_approval_requested"
)
elif isinstance(item, MCPListToolsItem):
# MCP工具列表事件
event = RunItemStreamEvent(
item=item,
name="mcp_list_tools"
)
else:
# 2) 处理未知类型的运行项
logger.warning(f"Unexpected item type: {type(item)}")
continue
# 3) 将事件放入异步队列(非阻塞)
if event:
try:
queue.put_nowait(event)
except asyncio.QueueFull:
logger.error("Stream event queue is full, dropping event")
算法目的: 将执行引擎产生的运行项目转换为标准化的流式事件,实现数据流的结构化传输。
转换策略特点:
- 类型映射:根据运行项类型映射到相应的事件名称
- 数据封装:将运行项数据封装为统一的事件结构
- 非阻塞处理:使用非阻塞队列操作避免阻塞执行流程
- 错误容忍:未知类型的运行项不会中断整个流程
3.2 流式响应处理算法
async def run_streamed(
agent: Agent[TContext],
input: str | list[TResponseInputItem],
session: Session | None = None,
run_config: RunConfig | None = None,
context: TContext | None = None,
stream_queue_size: int = 1000,
) -> RunResultStreaming[TContext]:
"""流式执行的核心算法"""
# 1) 创建异步队列用于流式事件传输
event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel] = asyncio.Queue(
maxsize=stream_queue_size
)
# 2) 启动异步任务执行代理逻辑
async def _execute_agent():
"""后台执行代理逻辑并生成流式事件"""
try:
# 执行标准的代理运行逻辑
result = await run(agent, input, session, run_config, context)
# 生成最终完成事件
if result.new_agent != agent:
# 代理发生了更新
agent_update_event = AgentUpdatedStreamEvent(
new_agent=result.new_agent
)
event_queue.put_nowait(agent_update_event)
return result
except Exception as e:
# 3) 异常处理:将异常转换为错误事件
error_event = create_error_stream_event(e)
event_queue.put_nowait(error_event)
raise
finally:
# 4) 发送完成信号
event_queue.put_nowait(QueueCompleteSentinel())
# 5) 启动后台执行任务
execution_task = asyncio.create_task(_execute_agent())
# 6) 创建流式结果对象
streaming_result = RunResultStreaming(
final_output_task=execution_task,
event_queue=event_queue,
agent=agent,
context=context
)
return streaming_result
class RunResultStreaming:
"""流式结果的处理类"""
def __init__(
self,
final_output_task: asyncio.Task,
event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
agent: Agent,
context: Any
):
self.final_output_task = final_output_task
self.event_queue = event_queue
self.agent = agent
self.context = context
self._final_result = None
async def stream_events(self) -> AsyncIterator[StreamEvent]:
"""异步生成器:流式产生事件"""
while True:
try:
# 1) 从队列获取事件(阻塞直到有事件或超时)
event = await asyncio.wait_for(
self.event_queue.get(),
timeout=30.0 # 30秒超时
)
# 2) 检查是否为完成信号
if isinstance(event, QueueCompleteSentinel):
break
# 3) 产生流式事件
yield event
# 4) 标记队列任务完成
self.event_queue.task_done()
except asyncio.TimeoutError:
# 超时处理:检查后台任务状态
if self.final_output_task.done():
break
else:
# 继续等待
continue
except Exception as e:
logger.error(f"Error in stream_events: {e}")
break
@property
async def final_output(self) -> str:
"""获取最终输出结果"""
if self._final_result is None:
self._final_result = await self.final_output_task
return self._final_result.final_output
算法目的: 提供高效的流式执行机制,支持实时事件流和最终结果的异步获取。
流式处理特点:
- 并发执行:后台任务执行代理逻辑,主线程处理流式事件
- 队列缓冲:异步队列提供缓冲和背压控制
- 超时机制:合理的超时处理避免无限等待
- 异常传播:执行异常通过事件机制传播到消费者
3.3 实时会话流式处理算法
class RealtimeSession(RealtimeModelListener):
"""实时会话的流式事件处理"""
def __init__(self, agent: RealtimeAgent, **kwargs):
self._agent = agent
self._event_queue: asyncio.Queue[RealtimeSessionEvent] = asyncio.Queue()
self._session_active = False
async def start_session(self) -> AsyncIterator[RealtimeSessionEvent]:
"""启动实时会话并产生流式事件"""
try:
# 1) 启动会话
self._session_active = True
# 发送会话开始事件
start_event = RealtimeAgentStartEvent(
agent=self._agent,
info=self._create_event_info()
)
yield start_event
# 2) 持续处理模型事件并转换为会话事件
while self._session_active:
try:
# 从模型获取原始事件
model_event = await asyncio.wait_for(
self._model.receive_event(),
timeout=5.0
)
# 转换为会话事件
session_events = await self._process_model_event(model_event)
# 产生所有转换后的事件
for event in session_events:
yield event
except asyncio.TimeoutError:
# 超时检查会话状态
if not self._session_active:
break
continue
except Exception as e:
# 产生错误事件
error_event = RealtimeError(
error=str(e),
info=self._create_event_info()
)
yield error_event
break
finally:
# 3) 会话结束处理
self._session_active = False
end_event = RealtimeAgentEndEvent(
agent=self._agent,
info=self._create_event_info()
)
yield end_event
async def _process_model_event(
self,
model_event: RealtimeModelEvent
) -> list[RealtimeSessionEvent]:
"""处理模型事件并转换为会话事件"""
events = []
if model_event.type == "response.audio.delta":
# 音频增量事件
audio_event = RealtimeAudio(
audio_data=model_event.delta,
info=self._create_event_info()
)
events.append(audio_event)
elif model_event.type == "response.audio.done":
# 音频完成事件
audio_end_event = RealtimeAudioEnd(
final_audio=model_event.data,
info=self._create_event_info()
)
events.append(audio_end_event)
elif model_event.type == "response.function_call_arguments.delta":
# 工具调用参数增量
if not hasattr(self, '_current_tool_call'):
# 开始新的工具调用
tool_start_event = RealtimeToolStart(
tool_name=model_event.name,
call_id=model_event.call_id,
info=self._create_event_info()
)
events.append(tool_start_event)
self._current_tool_call = model_event.call_id
elif model_event.type == "response.function_call_arguments.done":
# 工具调用完成
if hasattr(self, '_current_tool_call'):
tool_end_event = RealtimeToolEnd(
tool_name=model_event.name,
call_id=model_event.call_id,
arguments=model_event.arguments,
info=self._create_event_info()
)
events.append(tool_end_event)
delattr(self, '_current_tool_call')
elif model_event.type == "conversation.item.created":
# 对话项创建
history_event = RealtimeHistoryAdded(
item=model_event.item,
info=self._create_event_info()
)
events.append(history_event)
else:
# 其他原始事件直接转发
raw_event = RealtimeRawModelEvent(
event=model_event,
info=self._create_event_info()
)
events.append(raw_event)
return events
def _create_event_info(self) -> EventInfo:
"""创建事件信息上下文"""
return EventInfo(
session_id=self._session_id,
agent_name=self._agent.name,
timestamp=time.time()
)
算法目的: 处理实时会话中的复杂事件流,提供低延迟的实时交互体验。
实时处理特点:
- 事件转换:将模型原始事件转换为语义化的会话事件
- 状态跟踪:跟踪工具调用、音频处理等状态变化
- 并发安全:在异步环境中安全处理多种类型的事件
- 错误隔离:单个事件的处理错误不影响整个会话流
3.4 语音流式处理算法
class VoicePipeline:
"""语音处理流水线的流式事件生成"""
def __init__(self, config: VoiceConfig):
self.config = config
self._audio_queue: asyncio.Queue[bytes] = asyncio.Queue()
self._event_queue: asyncio.Queue[VoiceStreamEvent] = asyncio.Queue()
async def process_voice_stream(
self,
audio_input: AsyncIterator[bytes]
) -> AsyncIterator[VoiceStreamEvent]:
"""处理语音输入流并产生语音事件流"""
try:
# 1) 发送会话开始事件
start_event = VoiceStreamEventLifecycle(
event="turn_started"
)
yield start_event
# 2) 启动音频处理任务
processing_task = asyncio.create_task(
self._process_audio_stream(audio_input)
)
# 3) 产生流式事件
while not processing_task.done():
try:
# 从事件队列获取语音事件
voice_event = await asyncio.wait_for(
self._event_queue.get(),
timeout=1.0
)
yield voice_event
except asyncio.TimeoutError:
# 超时检查处理任务状态
continue
except Exception as e:
# 产生错误事件
error_event = VoiceStreamEventError(
error=str(e)
)
yield error_event
break
# 4) 等待处理任务完成
await processing_task
except Exception as e:
# 全局错误处理
error_event = VoiceStreamEventError(
error=f"Voice pipeline error: {e}"
)
yield error_event
finally:
# 5) 发送会话结束事件
end_event = VoiceStreamEventLifecycle(
event="turn_ended"
)
yield end_event
async def _process_audio_stream(self, audio_input: AsyncIterator[bytes]):
"""处理音频流的后台任务"""
audio_buffer = bytearray()
async for audio_chunk in audio_input:
# 1) 累积音频数据
audio_buffer.extend(audio_chunk)
# 2) 检查是否有足够数据进行处理
if len(audio_buffer) >= self.config.min_chunk_size:
# 处理音频数据
processed_audio = await self._process_audio_chunk(
bytes(audio_buffer[:self.config.min_chunk_size])
)
# 创建音频事件
audio_event = VoiceStreamEventAudio(
audio_data=processed_audio,
sample_rate=self.config.sample_rate,
channels=self.config.channels
)
# 将事件放入队列
await self._event_queue.put(audio_event)
# 清理已处理的数据
audio_buffer = audio_buffer[self.config.min_chunk_size:]
# 处理剩余的音频数据
if audio_buffer:
final_audio = await self._process_audio_chunk(bytes(audio_buffer))
final_event = VoiceStreamEventAudio(
audio_data=final_audio,
sample_rate=self.config.sample_rate,
channels=self.config.channels
)
await self._event_queue.put(final_event)
async def _process_audio_chunk(self, audio_data: bytes) -> bytes:
"""处理单个音频块"""
# 1) 音频预处理(降噪、规范化等)
preprocessed_audio = self._preprocess_audio(audio_data)
# 2) 语音识别(如果需要)
if self.config.enable_speech_recognition:
transcription = await self._transcribe_audio(preprocessed_audio)
# 可以产生转录事件
# 3) 音频后处理
processed_audio = self._postprocess_audio(preprocessed_audio)
return processed_audio
def _preprocess_audio(self, audio_data: bytes) -> bytes:
"""音频预处理"""
# 实现降噪、音量调节等预处理逻辑
return audio_data
def _postprocess_audio(self, audio_data: bytes) -> bytes:
"""音频后处理"""
# 实现音频格式转换、压缩等后处理逻辑
return audio_data
async def _transcribe_audio(self, audio_data: bytes) -> str:
"""语音转文本"""
# 调用语音识别服务
# 这里是示例实现
return "transcribed_text"
算法目的: 处理语音输入的流式数据,提供实时的语音处理和事件生成能力。
语音流式处理特点:
- 音频缓冲:合理的音频缓冲区管理平衡延迟和质量
- 并行处理:音频处理和事件生成并行进行
- 实时约束:满足语音交互的实时性要求
- 错误恢复:音频处理错误的优雅恢复机制
4. 数据结构与UML图
classDiagram
class StreamEvent {
<<Union Type>>
RawResponsesStreamEvent | RunItemStreamEvent | AgentUpdatedStreamEvent
}
class RawResponsesStreamEvent {
+TResponseStreamEvent data
+type: "raw_response_event"
}
class RunItemStreamEvent {
+str name
+RunItem item
+type: "run_item_stream_event"
}
class AgentUpdatedStreamEvent {
+Agent[Any] new_agent
+type: "agent_updated_stream_event"
}
class RunResultStreaming {
+asyncio.Task final_output_task
+asyncio.Queue event_queue
+Agent agent
+Any context
-Any _final_result
+stream_events() AsyncIterator[StreamEvent]
+final_output() str
+wait_for_completion() RunResult
}
class RealtimeSessionEvent {
<<Union Type>>
RealtimeAgentStartEvent | RealtimeAgentEndEvent | RealtimeHandoffEvent | ...
}
class RealtimeAgentStartEvent {
+RealtimeAgent agent
+EventInfo info
+type: "agent_start"
}
class RealtimeAgentEndEvent {
+RealtimeAgent agent
+EventInfo info
+type: "agent_end"
}
class RealtimeHandoffEvent {
+RealtimeAgent from_agent
+RealtimeAgent to_agent
+EventInfo info
+type: "handoff"
}
class RealtimeToolStart {
+str tool_name
+str call_id
+EventInfo info
+type: "tool_start"
}
class RealtimeToolEnd {
+str tool_name
+str call_id
+str arguments
+EventInfo info
+type: "tool_end"
}
class RealtimeAudio {
+bytes audio_data
+EventInfo info
+type: "audio"
}
class RealtimeAudioEnd {
+bytes final_audio
+EventInfo info
+type: "audio_end"
}
class RealtimeError {
+str error
+EventInfo info
+type: "error"
}
class VoiceStreamEvent {
<<Union Type>>
VoiceStreamEventAudio | VoiceStreamEventLifecycle | VoiceStreamEventError
}
class VoiceStreamEventAudio {
+bytes audio_data
+int sample_rate
+int channels
+type: "voice_stream_event_audio"
}
class VoiceStreamEventLifecycle {
+str event
+type: "voice_stream_event_lifecycle"
}
class VoiceStreamEventError {
+str error
+Any? details
+type: "voice_stream_event_error"
}
class QueueCompleteSentinel {
<<Marker Class>>
用于标识队列完成的哨兵对象
}
class EventInfo {
+str session_id
+str agent_name
+float timestamp
+dict? metadata
}
class RunItem {
<<Abstract>>
+Agent agent
+T raw_item
+to_input_item() TResponseInputItem
}
class MessageOutputItem {
+Agent agent
+ResponseOutputMessage raw_item
+content() str
+role() str
}
class ToolCallItem {
+Agent agent
+ResponseFunctionToolCall raw_item
+tool_name() str
+arguments() dict
}
class ToolCallOutputItem {
+Agent agent
+FunctionCallOutput raw_item
+output() Any
+success() bool
}
class HandoffCallItem {
+Agent agent
+Any raw_item
+target_agent() str
+reason() str
}
class HandoffOutputItem {
+Agent agent
+Any raw_item
+source_agent() Agent
+target_agent() Agent
}
StreamEvent --> RawResponsesStreamEvent
StreamEvent --> RunItemStreamEvent
StreamEvent --> AgentUpdatedStreamEvent
RunItemStreamEvent --> RunItem : contains
RealtimeSessionEvent --> RealtimeAgentStartEvent
RealtimeSessionEvent --> RealtimeAgentEndEvent
RealtimeSessionEvent --> RealtimeHandoffEvent
RealtimeSessionEvent --> RealtimeToolStart
RealtimeSessionEvent --> RealtimeToolEnd
RealtimeSessionEvent --> RealtimeAudio
RealtimeSessionEvent --> RealtimeAudioEnd
RealtimeSessionEvent --> RealtimeError
VoiceStreamEvent --> VoiceStreamEventAudio
VoiceStreamEvent --> VoiceStreamEventLifecycle
VoiceStreamEvent --> VoiceStreamEventError
RealtimeAgentStartEvent --> EventInfo : uses
RealtimeAgentEndEvent --> EventInfo : uses
RealtimeHandoffEvent --> EventInfo : uses
RealtimeToolStart --> EventInfo : uses
RealtimeToolEnd --> EventInfo : uses
RealtimeAudio --> EventInfo : uses
RealtimeAudioEnd --> EventInfo : uses
RealtimeError --> EventInfo : uses
RunResultStreaming --> StreamEvent : produces
RunResultStreaming --> QueueCompleteSentinel : uses
RunItem <|-- MessageOutputItem
RunItem <|-- ToolCallItem
RunItem <|-- ToolCallOutputItem
RunItem <|-- HandoffCallItem
RunItem <|-- HandoffOutputItem
note for StreamEvent "统一的流式事件接口<br/>支持多种事件类型"
note for RunResultStreaming "流式结果管理<br/>异步事件生成和最终结果获取"
note for RealtimeSessionEvent "实时会话专用事件<br/>支持语音和实时交互"
note for VoiceStreamEvent "语音流处理事件<br/>音频数据和生命周期管理"
类图说明:
事件类型层次结构
- StreamEvent:通用流式事件的联合类型,包含所有基础事件类型
- RealtimeSessionEvent:实时会话专用事件,支持语音和实时交互场景
- VoiceStreamEvent:语音处理专用事件,处理音频数据和生命周期
- RunItem:运行项目的基础类型,被包装为流式事件
数据封装设计
- 事件封装:所有事件都包含类型标识和相关数据
- 上下文信息:
EventInfo提供事件的上下文元数据 - 数据载荷:每个事件类型携带特定的数据载荷
- 类型安全:通过联合类型和字面量类型确保类型安全
生命周期管理
- 队列哨兵:
QueueCompleteSentinel标识流式处理的完成 - 任务管理:
RunResultStreaming管理异步任务和事件流 - 状态跟踪:实时事件包含详细的状态和上下文信息
- 错误处理:专门的错误事件类型处理异常情况
5. 典型使用场景时序图
场景一:基础流式代理执行
sequenceDiagram
autonumber
participant App as 应用代码
participant Runner as Runner
participant RunResultStreaming as 流式结果
participant Queue as 异步队列
participant RunImpl as 执行引擎
participant Agent as 代理
participant Model as 模型
participant Tools as 工具
App->>Runner: run_streamed(agent, input)
Runner->>Queue: 创建异步队列 Queue(maxsize=1000)
Runner->>RunResultStreaming: 创建流式结果对象
Runner->>RunImpl: 启动后台执行任务
RunImpl->>Agent: 开始代理执行
par 后台执行流程
Agent->>Model: 调用模型生成响应
Model-->>Agent: 返回模型响应
Agent->>RunImpl: 处理模型响应
RunImpl->>RunImpl: 创建 MessageOutputItem
RunImpl->>Queue: put_nowait(RunItemStreamEvent("message_output_created"))
Agent->>Tools: 调用工具函数
RunImpl->>Queue: put_nowait(RunItemStreamEvent("tool_called"))
Tools-->>Agent: 返回工具结果
RunImpl->>Queue: put_nowait(RunItemStreamEvent("tool_output"))
Agent->>Agent: 完成执行
RunImpl->>Queue: put_nowait(QueueCompleteSentinel())
and 前台流式处理
App->>RunResultStreaming: stream_events()
loop 处理流式事件
RunResultStreaming->>Queue: get() 获取事件
Queue-->>RunResultStreaming: RunItemStreamEvent
RunResultStreaming-->>App: yield StreamEvent
App->>App: 处理流式事件
alt 消息输出事件
App->>App: 更新UI显示增量文本
else 工具调用事件
App->>App: 显示工具调用状态
else 工具输出事件
App->>App: 显示工具执行结果
end
end
RunResultStreaming->>Queue: get() 获取完成信号
Queue-->>RunResultStreaming: QueueCompleteSentinel
RunResultStreaming-->>App: 流式事件结束
end
App->>RunResultStreaming: await final_output
RunResultStreaming-->>App: 返回最终结果
note over App, Tools: 并行处理:后台执行生成事件<br/>前台消费事件实时更新UI
场景二:实时会话流式处理
sequenceDiagram
autonumber
participant Client as 客户端
participant RealtimeSession as 实时会话
participant Model as 实时模型
participant EventProcessor as 事件处理器
participant AudioProcessor as 音频处理器
participant ToolManager as 工具管理
Client->>RealtimeSession: start_session()
RealtimeSession->>Model: 连接实时模型
Model-->>RealtimeSession: 连接建立
RealtimeSession->>Client: yield RealtimeAgentStartEvent
loop 实时交互循环
par 音频输入处理
Client->>RealtimeSession: 发送音频输入
RealtimeSession->>Model: 转发音频数据
Model->>Model: 实时语音识别
Model-->>RealtimeSession: response.audio.delta 事件
RealtimeSession->>EventProcessor: 处理音频增量事件
EventProcessor->>AudioProcessor: 处理音频数据
AudioProcessor-->>EventProcessor: 处理后的音频
EventProcessor->>RealtimeSession: RealtimeAudio 事件
RealtimeSession->>Client: yield RealtimeAudio(音频数据)
Client->>Client: 实时播放音频响应
and 工具调用处理
Model-->>RealtimeSession: response.function_call_arguments.delta
RealtimeSession->>EventProcessor: 处理工具调用增量
alt 工具调用开始
EventProcessor->>RealtimeSession: RealtimeToolStart 事件
RealtimeSession->>Client: yield RealtimeToolStart
Client->>Client: 显示工具调用状态
end
Model-->>RealtimeSession: response.function_call_arguments.done
RealtimeSession->>ToolManager: 执行工具调用
ToolManager->>ToolManager: 执行具体工具
ToolManager-->>RealtimeSession: 工具执行结果
RealtimeSession->>EventProcessor: 创建工具完成事件
EventProcessor->>RealtimeSession: RealtimeToolEnd 事件
RealtimeSession->>Client: yield RealtimeToolEnd
Client->>Client: 显示工具执行结果
and 会话状态管理
Model-->>RealtimeSession: conversation.item.created
RealtimeSession->>EventProcessor: 处理对话历史更新
EventProcessor->>RealtimeSession: RealtimeHistoryAdded 事件
RealtimeSession->>Client: yield RealtimeHistoryAdded
Client->>Client: 更新对话历史UI
end
alt 会话中断或错误
Model-->>RealtimeSession: error 事件
RealtimeSession->>EventProcessor: 处理错误事件
EventProcessor->>RealtimeSession: RealtimeError 事件
RealtimeSession->>Client: yield RealtimeError
Client->>Client: 处理错误并显示
break 退出循环
end
end
RealtimeSession->>Model: 断开连接
RealtimeSession->>Client: yield RealtimeAgentEndEvent
Client->>Client: 清理会话资源
note over Client, ToolManager: 实时处理:多种事件类型并行处理<br/>音频、工具调用、状态更新同时进行
场景三:语音流式处理管道
sequenceDiagram
autonumber
participant VoiceApp as 语音应用
participant VoicePipeline as 语音管道
participant AudioInput as 音频输入流
participant AudioProcessor as 音频处理器
participant EventQueue as 事件队列
participant SpeechRecognition as 语音识别
participant AudioOutput as 音频输出
VoiceApp->>VoicePipeline: process_voice_stream(audio_input)
VoicePipeline->>EventQueue: 创建事件队列
VoicePipeline->>VoiceApp: yield VoiceStreamEventLifecycle("turn_started")
VoicePipeline->>AudioProcessor: 启动音频处理任务
par 音频输入处理
loop 音频流处理
AudioInput->>VoicePipeline: 音频数据块
VoicePipeline->>AudioProcessor: 累积音频数据
alt 数据块大小足够
AudioProcessor->>AudioProcessor: _preprocess_audio() 音频预处理
AudioProcessor->>AudioProcessor: 降噪、音量调节
AudioProcessor->>SpeechRecognition: _transcribe_audio() 语音识别
SpeechRecognition->>SpeechRecognition: 转换语音为文本
SpeechRecognition-->>AudioProcessor: 转录文本
AudioProcessor->>AudioProcessor: _postprocess_audio() 音频后处理
AudioProcessor->>AudioProcessor: 格式转换、压缩
AudioProcessor->>EventQueue: put(VoiceStreamEventAudio)
end
end
AudioInput->>VoicePipeline: 音频流结束
AudioProcessor->>AudioProcessor: 处理剩余音频数据
AudioProcessor->>EventQueue: put(最终音频事件)
and 事件流输出
loop 事件处理循环
VoicePipeline->>EventQueue: get() 获取语音事件
EventQueue-->>VoicePipeline: VoiceStreamEventAudio
VoicePipeline->>VoiceApp: yield VoiceStreamEventAudio
VoiceApp->>AudioOutput: 播放或处理音频数据
VoiceApp->>VoiceApp: 更新语音UI状态
end
end
alt 处理异常
AudioProcessor->>AudioProcessor: 发生音频处理错误
AudioProcessor->>EventQueue: put(VoiceStreamEventError)
VoicePipeline->>VoiceApp: yield VoiceStreamEventError
VoiceApp->>VoiceApp: 显示错误信息
VoiceApp->>VoiceApp: 恢复或重启语音处理
end
VoicePipeline->>VoiceApp: yield VoiceStreamEventLifecycle("turn_ended")
VoiceApp->>VoiceApp: 清理语音会话资源
note over VoiceApp, AudioOutput: 语音流水线:并行处理音频输入<br/>实时生成音频事件和转录结果
6. 最佳实践与使用模式
6.1 基础流式代理使用
from agents import Agent, Runner
import asyncio
async def basic_streaming_example():
"""基础流式代理使用示例"""
# 创建代理
agent = Agent(
name="StreamingAssistant",
instructions="你是一个支持流式响应的助手,请详细回答用户问题。",
model="gpt-4o"
)
# 执行流式运行
result = Runner.run_streamed(
agent,
"请详细解释机器学习的工作原理,包括训练和推理过程。",
stream_queue_size=500 # 设置事件队列大小
)
# 处理流式事件
accumulated_content = ""
try:
async for event in result.stream_events():
if event.type == "run_item_stream_event":
if event.name == "message_output_created":
# 处理消息输出事件
message_item = event.item
content = message_item.content
# 计算增量内容
if content != accumulated_content:
delta = content[len(accumulated_content):]
print(delta, end="", flush=True)
accumulated_content = content
elif event.name == "tool_called":
# 处理工具调用事件
tool_item = event.item
print(f"\n[工具调用: {tool_item.tool_name}]")
print(f"参数: {tool_item.arguments}")
elif event.name == "tool_output":
# 处理工具输出事件
output_item = event.item
print(f"[工具结果: {output_item.output}]")
elif event.name == "handoff_requested":
# 处理交接请求事件
handoff_item = event.item
print(f"\n[代理交接: {handoff_item.target_agent}]")
elif event.name == "reasoning_item_created":
# 处理推理过程事件
reasoning_item = event.item
print(f"\n[推理过程: {reasoning_item.content}]")
elif event.type == "agent_updated_stream_event":
# 处理代理更新事件
print(f"\n[代理切换: {event.new_agent.name}]")
elif event.type == "raw_response_event":
# 处理原始响应事件(通常用于调试)
raw_data = event.data
print(f"[原始事件: {raw_data.type if hasattr(raw_data, 'type') else 'unknown'}]")
except Exception as e:
print(f"\n流式处理错误: {e}")
# 获取最终结果
try:
final_output = await result.final_output
print(f"\n\n=== 最终输出 ===\n{final_output}")
except Exception as e:
print(f"获取最终结果错误: {e}")
# 运行示例
asyncio.run(basic_streaming_example())
6.2 实时会话流式处理
from agents.realtime import RealtimeAgent, RealtimeSession
import asyncio
class RealtimeStreamingHandler:
"""实时流式处理器"""
def __init__(self):
self.session_active = False
self.current_audio_buffer = bytearray()
async def handle_realtime_session(self, agent: RealtimeAgent):
"""处理实时会话流式事件"""
session = RealtimeSession(agent)
try:
self.session_active = True
print("🎙️ 实时会话开始...")
async for event in session.start_session():
if event.type == "agent_start":
print(f"✅ 代理启动: {event.agent.name}")
await self._setup_audio_input(session)
elif event.type == "agent_end":
print(f"⏹️ 代理结束: {event.agent.name}")
self.session_active = False
elif event.type == "audio":
# 处理实时音频数据
await self._handle_audio_data(event.audio_data)
elif event.type == "audio_end":
# 处理音频完成
await self._handle_audio_complete(event.final_audio)
elif event.type == "tool_start":
print(f"🔧 工具调用开始: {event.tool_name}")
print(f" 调用ID: {event.call_id}")
elif event.type == "tool_end":
print(f"✅ 工具调用完成: {event.tool_name}")
print(f" 参数: {event.arguments}")
elif event.type == "handoff":
print(f"🔄 代理交接: {event.from_agent.name} → {event.to_agent.name}")
# 可以在这里处理代理切换的UI更新
elif event.type == "error":
print(f"❌ 会话错误: {event.error}")
await self._handle_session_error(event)
elif event.type == "history_added":
print(f"📝 对话历史更新")
await self._update_conversation_ui(event.item)
elif event.type == "raw_model_event":
# 处理原始模型事件(调试用)
print(f"🔍 原始事件: {event.event.type}")
# 检查会话是否应该继续
if not self.session_active:
break
except Exception as e:
print(f"实时会话处理错误: {e}")
finally:
await self._cleanup_session()
async def _setup_audio_input(self, session: RealtimeSession):
"""设置音频输入"""
print("🎵 配置音频输入...")
# 这里可以配置麦克风、音频格式等
async def _handle_audio_data(self, audio_data: bytes):
"""处理实时音频数据"""
self.current_audio_buffer.extend(audio_data)
# 实时播放音频(示例)
print(f"🔊 接收音频数据: {len(audio_data)} 字节")
# 可以在这里实现实时音频播放
# await self._play_audio_chunk(audio_data)
async def _handle_audio_complete(self, final_audio: bytes):
"""处理音频完成"""
print(f"🎵 音频完成,总长度: {len(final_audio)} 字节")
# 保存或进一步处理完整音频
# await self._save_audio(final_audio)
# 清空缓冲区
self.current_audio_buffer.clear()
async def _handle_session_error(self, error_event):
"""处理会话错误"""
print(f"处理会话错误: {error_event.error}")
# 可以实现错误恢复逻辑
# 例如:重连、重启会话等
async def _update_conversation_ui(self, conversation_item):
"""更新对话界面"""
print(f"更新对话UI: {conversation_item}")
# 实现UI更新逻辑
async def _cleanup_session(self):
"""清理会话资源"""
print("🧹 清理会话资源...")
self.current_audio_buffer.clear()
# 使用示例
async def realtime_streaming_example():
"""实时流式处理示例"""
# 创建实时代理
realtime_agent = RealtimeAgent(
name="RealtimeVoiceAssistant",
instructions="你是一个语音助手,请用自然的语调回答用户问题。",
voice="nova", # 设置语音
model="gpt-4o-realtime-preview"
)
# 创建处理器
handler = RealtimeStreamingHandler()
# 启动实时会话
await handler.handle_realtime_session(realtime_agent)
# 运行实时流式示例
# asyncio.run(realtime_streaming_example())
6.3 自定义流式事件处理器
from agents import StreamEvent
from typing import Dict, List, Callable, Any
import asyncio
import json
from datetime import datetime
class StreamEventProcessor:
"""自定义流式事件处理器"""
def __init__(self):
self.event_handlers: Dict[str, List[Callable]] = {}
self.event_history: List[Dict] = []
self.metrics: Dict[str, Any] = {
"total_events": 0,
"events_by_type": {},
"processing_errors": 0,
"start_time": datetime.now()
}
def register_handler(self, event_type: str, handler: Callable):
"""注册事件处理器"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
def register_pattern_handler(self, pattern: str, handler: Callable):
"""注册模式匹配处理器"""
def pattern_wrapper(event):
if hasattr(event, 'name') and pattern in event.name:
return handler(event)
elif hasattr(event, 'type') and pattern in event.type:
return handler(event)
self.register_handler("*", pattern_wrapper)
async def process_stream(self, event_stream) -> None:
"""处理事件流"""
try:
async for event in event_stream:
await self._process_single_event(event)
except Exception as e:
print(f"流式处理错误: {e}")
self.metrics["processing_errors"] += 1
finally:
await self._finalize_processing()
async def _process_single_event(self, event: StreamEvent):
"""处理单个事件"""
try:
# 更新指标
self.metrics["total_events"] += 1
event_type = getattr(event, 'type', 'unknown')
self.metrics["events_by_type"][event_type] = (
self.metrics["events_by_type"].get(event_type, 0) + 1
)
# 记录事件历史
event_record = {
"timestamp": datetime.now().isoformat(),
"type": event_type,
"data": self._serialize_event(event)
}
self.event_history.append(event_record)
# 调用注册的处理器
handlers = self.event_handlers.get(event_type, [])
handlers.extend(self.event_handlers.get("*", [])) # 通用处理器
for handler in handlers:
try:
result = handler(event)
if asyncio.iscoroutine(result):
await result
except Exception as e:
print(f"事件处理器错误: {e}")
self.metrics["processing_errors"] += 1
except Exception as e:
print(f"事件处理错误: {e}")
self.metrics["processing_errors"] += 1
def _serialize_event(self, event: StreamEvent) -> Dict:
"""序列化事件数据"""
try:
if hasattr(event, '__dict__'):
return {k: str(v) for k, v in event.__dict__.items()}
else:
return {"raw": str(event)}
except Exception:
return {"error": "serialization_failed"}
async def _finalize_processing(self):
"""完成处理后的清理工作"""
duration = datetime.now() - self.metrics["start_time"]
self.metrics["total_duration_seconds"] = duration.total_seconds()
print("\n=== 流式处理统计 ===")
print(f"总事件数: {self.metrics['total_events']}")
print(f"处理时长: {self.metrics['total_duration_seconds']:.2f}秒")
print(f"平均事件率: {self.metrics['total_events']/self.metrics['total_duration_seconds']:.2f} 事件/秒")
print(f"处理错误: {self.metrics['processing_errors']}")
print("\n事件类型统计:")
for event_type, count in self.metrics["events_by_type"].items():
print(f" {event_type}: {count}")
def get_metrics(self) -> Dict:
"""获取处理指标"""
return self.metrics.copy()
def get_event_history(self, limit: int = None) -> List[Dict]:
"""获取事件历史"""
if limit:
return self.event_history[-limit:]
return self.event_history.copy()
# 使用自定义处理器的示例
async def custom_processor_example():
"""自定义处理器使用示例"""
# 创建处理器
processor = StreamEventProcessor()
# 注册消息输出处理器
def handle_message_output(event):
if hasattr(event, 'item') and hasattr(event.item, 'content'):
content = event.item.content
print(f"📝 消息: {content[:100]}{'...' if len(content) > 100 else ''}")
processor.register_handler("run_item_stream_event", handle_message_output)
# 注册工具调用处理器
async def handle_tool_events(event):
if hasattr(event, 'name'):
if event.name == "tool_called":
print(f"🔧 工具调用: {event.item.tool_name}")
# 可以记录到数据库、发送通知等
await asyncio.sleep(0.01) # 模拟异步处理
elif event.name == "tool_output":
print(f"✅ 工具完成: {event.item.tool_name}")
processor.register_pattern_handler("tool", handle_tool_events)
# 注册代理更新处理器
def handle_agent_update(event):
print(f"🔄 代理更新: {event.new_agent.name}")
processor.register_handler("agent_updated_stream_event", handle_agent_update)
# 创建测试代理
agent = Agent(
name="TestAgent",
instructions="你是一个测试代理,请简单回答问题。"
)
# 执行流式运行
result = Runner.run_streamed(agent, "请简单介绍一下Python。")
# 使用自定义处理器处理事件流
await processor.process_stream(result.stream_events())
# 获取最终结果
final_output = await result.final_output
print(f"\n最终输出: {final_output}")
# 查看处理指标
metrics = processor.get_metrics()
print(f"\n处理指标: {json.dumps(metrics, indent=2, default=str)}")
# 运行自定义处理器示例
# asyncio.run(custom_processor_example())
6.4 流式事件的错误处理和重试
from agents import StreamEvent
import asyncio
import logging
from typing import Optional, Dict, List
from dataclasses import dataclass
from enum import Enum
class EventProcessingStrategy(Enum):
FAIL_FAST = "fail_fast" # 遇到错误立即失败
SKIP_ERROR = "skip_error" # 跳过错误事件继续处理
RETRY_ON_ERROR = "retry_on_error" # 重试错误事件
@dataclass
class RetryConfig:
max_retries: int = 3
initial_delay: float = 1.0
max_delay: float = 10.0
backoff_factor: float = 2.0
class RobustStreamProcessor:
"""健壮的流式事件处理器"""
def __init__(
self,
strategy: EventProcessingStrategy = EventProcessingStrategy.SKIP_ERROR,
retry_config: Optional[RetryConfig] = None
):
self.strategy = strategy
self.retry_config = retry_config or RetryConfig()
self.logger = logging.getLogger(__name__)
# 错误统计
self.error_stats: Dict[str, int] = {}
self.failed_events: List[Dict] = []
self.retry_attempts: Dict[str, int] = {}
async def process_stream_with_resilience(
self,
event_stream,
event_handler
) -> Dict[str, Any]:
"""带弹性的流式事件处理"""
processed_count = 0
error_count = 0
skipped_count = 0
try:
async for event in event_stream:
try:
await self._process_event_with_strategy(event, event_handler)
processed_count += 1
except Exception as e:
error_count += 1
await self._handle_processing_error(event, e)
if self.strategy == EventProcessingStrategy.FAIL_FAST:
raise
elif self.strategy == EventProcessingStrategy.SKIP_ERROR:
skipped_count += 1
continue
except Exception as e:
self.logger.error(f"流式处理致命错误: {e}")
raise
return {
"processed": processed_count,
"errors": error_count,
"skipped": skipped_count,
"error_stats": self.error_stats,
"failed_events_count": len(self.failed_events)
}
async def _process_event_with_strategy(self, event: StreamEvent, handler):
"""根据策略处理事件"""
if self.strategy == EventProcessingStrategy.RETRY_ON_ERROR:
await self._process_with_retry(event, handler)
else:
await self._process_once(event, handler)
async def _process_once(self, event: StreamEvent, handler):
"""单次处理事件"""
try:
result = handler(event)
if asyncio.iscoroutine(result):
await result
except Exception as e:
self._record_error(event, e)
raise
async def _process_with_retry(self, event: StreamEvent, handler):
"""带重试的事件处理"""
event_id = self._get_event_id(event)
retry_count = 0
last_error = None
while retry_count <= self.retry_config.max_retries:
try:
result = handler(event)
if asyncio.iscoroutine(result):
await result
# 成功处理,清除重试记录
if event_id in self.retry_attempts:
del self.retry_attempts[event_id]
return
except Exception as e:
last_error = e
retry_count += 1
self.retry_attempts[event_id] = retry_count
if retry_count <= self.retry_config.max_retries:
# 计算退避延迟
delay = min(
self.retry_config.initial_delay * (
self.retry_config.backoff_factor ** (retry_count - 1)
),
self.retry_config.max_delay
)
self.logger.warning(
f"事件处理失败,{delay:.2f}秒后重试 "
f"({retry_count}/{self.retry_config.max_retries}): {e}"
)
await asyncio.sleep(delay)
else:
# 重试次数用完,记录错误
self._record_error(event, last_error)
raise last_error
async def _handle_processing_error(self, event: StreamEvent, error: Exception):
"""处理事件处理错误"""
error_type = type(error).__name__
self.error_stats[error_type] = self.error_stats.get(error_type, 0) + 1
# 记录失败的事件详情
failed_event = {
"event_type": getattr(event, 'type', 'unknown'),
"error_type": error_type,
"error_message": str(error),
"timestamp": datetime.now().isoformat(),
"retry_attempts": self.retry_attempts.get(self._get_event_id(event), 0)
}
self.failed_events.append(failed_event)
self.logger.error(f"事件处理失败: {error}")
def _record_error(self, event: StreamEvent, error: Exception):
"""记录错误"""
error_type = type(error).__name__
self.error_stats[error_type] = self.error_stats.get(error_type, 0) + 1
def _get_event_id(self, event: StreamEvent) -> str:
"""获取事件唯一标识"""
event_type = getattr(event, 'type', 'unknown')
timestamp = getattr(event, 'timestamp', id(event))
return f"{event_type}_{timestamp}"
def get_error_report(self) -> Dict[str, Any]:
"""获取错误报告"""
return {
"error_statistics": self.error_stats,
"failed_events": self.failed_events,
"active_retries": self.retry_attempts
}
# 使用示例
async def resilient_streaming_example():
"""弹性流式处理示例"""
# 创建可能出错的事件处理函数
processed_events = 0
async def potentially_failing_handler(event):
nonlocal processed_events
processed_events += 1
# 模拟不同类型的错误
import random
if random.random() < 0.1: # 10% 概率网络错误
raise ConnectionError("模拟网络连接错误")
if random.random() < 0.05: # 5% 概率数据错误
raise ValueError("模拟数据格式错误")
if random.random() < 0.02: # 2% 概率系统错误
raise RuntimeError("模拟系统内部错误")
# 正常处理
if hasattr(event, 'type'):
print(f"✅ 处理事件: {event.type}")
# 模拟处理时间
await asyncio.sleep(0.01)
# 测试不同的处理策略
strategies = [
EventProcessingStrategy.SKIP_ERROR,
EventProcessingStrategy.RETRY_ON_ERROR
]
for strategy in strategies:
print(f"\n=== 测试策略: {strategy.value} ===")
processor = RobustStreamProcessor(
strategy=strategy,
retry_config=RetryConfig(
max_retries=2,
initial_delay=0.1,
backoff_factor=2.0
)
)
# 创建测试代理和流式结果
agent = Agent(
name="TestAgent",
instructions="简单回答问题,用于测试流式处理。"
)
result = Runner.run_streamed(agent, "请说一个笑话。")
try:
stats = await processor.process_stream_with_resilience(
result.stream_events(),
potentially_failing_handler
)
print(f"处理统计: {stats}")
# 获取错误报告
error_report = processor.get_error_report()
if error_report["error_statistics"]:
print(f"错误报告: {error_report}")
except Exception as e:
print(f"处理失败: {e}")
# 获取最终结果
try:
final_output = await result.final_output
print(f"最终输出: {final_output[:100]}...")
except Exception as e:
print(f"获取最终结果失败: {e}")
# 运行弹性处理示例
# asyncio.run(resilient_streaming_example())
Streaming模块通过事件驱动的架构和异步队列机制,为OpenAI Agents提供了高效的流式数据处理能力,支持从基础文本流到复杂实时交互的各种场景需求。
API接口
1. API 总览
Streaming 模块提供了流式事件系统,支持实时获取Agent执行过程中的各种事件。
API 分类
| API类别 | 核心API | 功能描述 |
|---|---|---|
| 流式执行 | run_streamed(agent, input) |
流式运行Agent |
| 事件迭代 | async for event in stream |
异步迭代事件 |
| 事件类型 | RunStartEvent |
运行开始事件 |
RunStepDoneEvent |
步骤完成事件 | |
RunDoneEvent |
运行完成事件 |
2. run_streamed() API
API签名:
async def run_streamed(
agent: Agent,
input: str | list,
*,
config: RunConfig | None = None,
**kwargs
) -> AsyncIterator[StreamEvent]
使用示例:
from agents import Agent, run_streamed
agent = Agent(name="assistant", instructions="你是助手")
# 流式执行
async for event in run_streamed(agent, "你好"):
if event["type"] == "response.text.delta":
print(event["delta"], end="", flush=True)
elif event["type"] == "response.done":
print("\n完成!")
3. 流式事件类型
3.1 RunStartEvent
{
"type": "run.start",
"run_id": "run_abc123"
}
3.2 TextDeltaEvent
{
"type": "response.text.delta",
"delta": "你好",
"content_index": 0
}
3.3 FunctionCallEvent
{
"type": "response.function_call_arguments.delta",
"delta": '{"query": "天气"}',
"call_id": "call_123"
}
3.4 RunDoneEvent
{
"type": "run.done",
"run_result": RunResult(...)
}
4. 高级用法
4.1 事件过滤
async for event in run_streamed(agent, "查询"):
# 只处理文本增量
if event["type"] == "response.text.delta":
handle_text(event["delta"])
4.2 进度追踪
async for event in run_streamed(agent, input):
if event["type"] == "run.step.done":
step_num = event.get("step_number")
print(f"完成步骤 {step_num}")
4.3 实时UI更新
async for event in run_streamed(agent, query):
if event["type"] == "response.text.delta":
await websocket.send_text(event["delta"])
elif event["type"] == "response.function_call":
await websocket.send_json({
"tool": event["name"],
"status": "calling"
})
Streaming模块通过丰富的事件类型,支持构建实时响应的AI应用。
数据结构
1. 流式事件结构
classDiagram
class StreamEvent {
<<union>>
RunStartEvent
TextDeltaEvent
FunctionCallEvent
RunStepDoneEvent
RunDoneEvent
}
class RunStartEvent {
+type: "run.start"
+run_id: str
}
class TextDeltaEvent {
+type: "response.text.delta"
+delta: str
+content_index: int
}
class FunctionCallEvent {
+type: "response.function_call_arguments.delta"
+delta: str
+call_id: str
}
class RunDoneEvent {
+type: "run.done"
+run_result: RunResult
}
StreamEvent <|-- RunStartEvent
StreamEvent <|-- TextDeltaEvent
StreamEvent <|-- FunctionCallEvent
StreamEvent <|-- RunDoneEvent
2. 事件类型详解
2.1 文本事件
# 文本增量事件
class TextDeltaEvent(TypedDict):
type: Literal["response.text.delta"]
delta: str # 文本片段
content_index: int # 内容索引
# 文本完成事件
class TextDoneEvent(TypedDict):
type: Literal["response.output_item.done"]
item: ResponseTextItem # 完整文本项
2.2 工具调用事件
# 工具参数增量
class FunctionCallArgumentsDelta(TypedDict):
type: Literal["response.function_call_arguments.delta"]
delta: str # 参数JSON片段
call_id: str # 调用ID
# 工具调用完成
class FunctionCallDone(TypedDict):
type: Literal["response.function_call.done"]
call_id: str
name: str
arguments: str # 完整参数JSON
2.3 运行事件
# 运行开始
class RunStartEvent(TypedDict):
type: Literal["run.start"]
run_id: str
# 步骤完成
class RunStepDoneEvent(TypedDict):
type: Literal["run.step.done"]
step_number: int
# 运行完成
class RunDoneEvent(TypedDict):
type: Literal["run.done"]
run_result: RunResult
3. 事件流结构
graph TD
A[run_streamed] --> B[RunStartEvent]
B --> C{事件循环}
C --> D[TextDeltaEvent*]
C --> E[FunctionCallEvent*]
C --> F[RunStepDoneEvent*]
D --> C
E --> C
F --> C
C --> G[RunDoneEvent]
G --> H[结束]
4. 事件聚合器
class EventAggregator:
"""聚合流式事件为完整对象"""
text_buffers: dict[int, str] # 文本缓冲
args_buffers: dict[str, str] # 参数缓冲
def add_text_delta(self, index: int, delta: str):
"""累积文本增量"""
self.text_buffers[index] = self.text_buffers.get(index, "") + delta
def add_args_delta(self, call_id: str, delta: str):
"""累积参数增量"""
self.args_buffers[call_id] = self.args_buffers.get(call_id, "") + delta
def get_complete_text(self, index: int) -> str:
"""获取完整文本"""
return self.text_buffers.get(index, "")
5. 数据流转
sequenceDiagram
participant Model as LLM API
participant Stream as StreamHandler
participant Agg as Aggregator
participant User as 用户代码
Model->>Stream: chunk 1: "你"
Stream->>User: TextDeltaEvent("你")
Stream->>Agg: add_delta("你")
Model->>Stream: chunk 2: "好"
Stream->>User: TextDeltaEvent("好")
Stream->>Agg: add_delta("好")
Model->>Stream: done
Agg->>Agg: get_complete_text() -> "你好"
Stream->>User: TextDoneEvent("你好")
Streaming模块通过结构化的事件系统,实现了细粒度的实时数据传输。
时序图
1. 流式执行完整时序图
sequenceDiagram
participant U as 用户
participant R as Runner
participant M as Model
participant S as StreamHandler
participant A as Agent
U->>R: run_streamed(agent, input)
activate R
R->>U: yield RunStartEvent
R->>A: 执行Agent
activate A
A->>M: stream_response()
activate M
loop 流式数据
M-->>S: SSE chunk
alt 文本增量
S->>U: yield TextDeltaEvent
else 工具调用
S->>U: yield FunctionCallEvent
else 步骤完成
S->>U: yield StepDoneEvent
end
end
M-->>A: 完成
deactivate M
A-->>R: 结果
deactivate A
R->>U: yield RunDoneEvent
deactivate R
2. 文本流式处理
sequenceDiagram
participant API as LLM API
participant H as StreamHandler
participant B as TextBuffer
participant U as 用户
API->>H: chunk: "你"
H->>U: TextDeltaEvent("你")
H->>B: append("你")
API->>H: chunk: "好"
H->>U: TextDeltaEvent("好")
H->>B: append("好")
API->>H: chunk: "!"
H->>U: TextDeltaEvent("!")
H->>B: append("!")
API->>H: done
H->>B: get_complete() -> "你好!"
H->>U: TextDoneEvent("你好!")
3. 工具调用流式处理
sequenceDiagram
participant M as Model
participant H as Handler
participant B as ArgsBuffer
participant U as 用户
participant T as Tool
M->>H: function_call_start
H->>U: FunctionCallStartEvent
loop 参数流式传输
M->>H: args delta: '{"quer'
H->>U: ArgsDeltaEvent
H->>B: append('{"quer')
M->>H: args delta: 'y": "天气"}'
H->>U: ArgsDeltaEvent
H->>B: append('y": "天气"}')
end
M->>H: function_call_done
H->>B: parse_json() -> {"query": "天气"}
H->>U: FunctionCallDoneEvent
H->>T: execute_tool(args)
T-->>H: 结果
H->>U: ToolOutputEvent
4. 多步骤流式执行
sequenceDiagram
participant U as 用户
participant R as Runner
U->>R: run_streamed(agent, input)
activate R
R->>U: RunStartEvent
Note over R: Turn 1
R->>U: TextDeltaEvent*
R->>U: FunctionCallEvent
R->>U: StepDoneEvent(step=1)
Note over R: Turn 2
R->>U: ToolExecutionEvent
R->>U: TextDeltaEvent*
R->>U: StepDoneEvent(step=2)
Note over R: Turn 3
R->>U: TextDeltaEvent*
R->>U: StepDoneEvent(step=3)
R->>U: RunDoneEvent(result)
deactivate R
5. 错误处理流
sequenceDiagram
participant U as 用户
participant R as Runner
participant M as Model
U->>R: run_streamed(agent, input)
R->>U: RunStartEvent
R->>M: stream_response()
loop 正常流式
M-->>U: TextDeltaEvent
end
alt 发生错误
M-->>R: Exception
R->>U: RunErrorEvent
R->>U: RunDoneEvent(error)
else 正常完成
M-->>R: 完成
R->>U: RunDoneEvent(success)
end
6. 实时UI更新流程
sequenceDiagram
participant Backend as 后端
participant WS as WebSocket
participant UI as 前端UI
Backend->>WS: RunStartEvent
WS->>UI: 显示"思考中..."
loop 流式文本
Backend->>WS: TextDeltaEvent("你")
WS->>UI: 追加"你"
Backend->>WS: TextDeltaEvent("好")
WS->>UI: 追加"好"
end
Backend->>WS: FunctionCallEvent
WS->>UI: 显示"调用工具..."
Backend->>WS: ToolOutputEvent
WS->>UI: 显示工具结果
Backend->>WS: RunDoneEvent
WS->>UI: 完成,允许新输入
Streaming模块通过精心设计的时序流程,实现了流畅的实时交互体验。