7.1 代理协作层架构概览
代理协作层是OpenAI Agents SDK的核心创新之一,实现了多个AI代理之间的协作和任务委托。该层包含两个主要组件:
- Handoffs (代理切换): 实现代理间的任务转移和上下文传递
- Guardrails (安全防护): 提供输入输出的安全检查和过滤机制
7.2 代理协作架构图
graph TD
A[主代理] --> B{需要协作?}
B -->|是| C[Handoff 代理切换]
B -->|否| D[继续当前代理]
C --> E[选择目标代理]
E --> F[应用输入过滤器]
F --> G[传递上下文]
G --> H[启动目标代理]
H --> I[目标代理执行]
I --> J[生成输出]
J --> K[应用输出防护]
K --> L{防护通过?}
L -->|是| M[返回结果]
L -->|否| N[触发安全警报]
D --> O[输入防护检查]
O --> P{防护通过?}
P -->|是| Q[执行代理逻辑]
P -->|否| R[拒绝执行]
style C fill:#e3f2fd
style K fill:#ffebee
style O fill:#ffebee
7.3 Handoffs (代理切换) 深度分析
7.3.1 Handoff核心数据结构
# 位于 src/agents/handoffs.py
@dataclass
class Handoff(Generic[TContext, TAgent]):
"""
代理切换的核心配置类
定义了如何从一个代理切换到另一个代理,包括:
- 工具接口定义
- 输入处理逻辑
- 上下文传递规则
"""
tool_name: str # 切换工具的名称
tool_description: str # 切换工具的描述
input_json_schema: dict[str, Any] # 输入参数的JSON Schema
on_invoke_handoff: Callable[[RunContextWrapper[Any], str], Awaitable[TAgent]] # 切换处理函数
agent_name: str # 目标代理名称
strict_json_schema: bool = True # 是否使用严格JSON Schema
input_filter: HandoffInputFilter | None = None # 输入过滤器
is_enabled: bool | Callable[[RunContextWrapper[Any], AgentBase], MaybeAwaitable[bool]] = True # 启用状态
def __post_init__(self):
"""初始化后处理"""
if self.strict_json_schema:
self.input_json_schema = ensure_strict_json_schema(self.input_json_schema)
def to_openai_format(self) -> dict[str, Any]:
"""转换为OpenAI API切换格式"""
return {
"type": "handoff",
"handoff": {
"name": self.tool_name,
"description": self.tool_description,
"parameters": self.input_json_schema,
"agent_name": self.agent_name,
}
}
@dataclass(frozen=True)
class HandoffInputData:
"""
代理切换时传递的输入数据
包含切换发生时的完整上下文信息
"""
input_history: str | tuple[TResponseInputItem, ...] # 输入历史
pre_handoff_items: tuple[RunItem, ...] # 切换前的运行项目
new_items: tuple[RunItem, ...] # 新生成的项目
run_context: RunContextWrapper[Any] | None = None # 运行上下文
def clone(self, **kwargs: Any) -> HandoffInputData:
"""创建修改后的副本"""
return dataclasses_replace(self, **kwargs)
# 输入过滤器类型定义
HandoffInputFilter: TypeAlias = Callable[[HandoffInputData], MaybeAwaitable[HandoffInputData]]
7.3.2 handoff装饰器实现
# 位于 src/agents/handoffs.py
def handoff(
agent: Agent[TContext] | Callable[[], Agent[TContext]],
*,
name: str | None = None,
description: str | None = None,
input_filter: HandoffInputFilter | None = None,
is_enabled: bool | Callable[[RunContextWrapper[Any], AgentBase], MaybeAwaitable[bool]] = True,
) -> Handoff[TContext, Agent[TContext]]:
"""
创建代理切换配置的便捷函数
将Agent实例转换为可切换的Handoff配置
Args:
agent: 目标代理实例或创建代理的函数
name: 切换工具名称(可选)
description: 切换工具描述(可选)
input_filter: 输入过滤器(可选)
is_enabled: 启用状态(可选)
Returns:
Handoff: 配置好的代理切换对象
"""
# 1. 解析代理实例
if callable(agent) and not isinstance(agent, Agent):
# 代理工厂函数
agent_factory = agent
target_agent = agent() # 调用工厂函数获取代理实例
else:
# 直接的代理实例
target_agent = cast(Agent[TContext], agent)
agent_factory = lambda: target_agent
# 2. 生成工具名称和描述
tool_name = name or f"handoff_to_{_transforms.transform_string_function_style(target_agent.name)}"
tool_description = description or (
target_agent.handoff_description or
f"Hand off the conversation to {target_agent.name}"
)
# 3. 生成输入Schema(如果代理需要特定输入)
input_schema = _generate_handoff_input_schema(target_agent)
# 4. 创建切换处理函数
async def on_invoke_handoff(
context: RunContextWrapper[Any],
arguments_json: str
) -> Agent[TContext]:
"""处理代理切换的内部函数"""
# 解析参数(如果有)
if arguments_json.strip():
try:
arguments = json.loads(arguments_json)
except json.JSONDecodeError as e:
raise ModelBehaviorError(f"Invalid handoff arguments JSON: {e}")
else:
arguments = {}
# 获取目标代理实例
if callable(agent) and not isinstance(agent, Agent):
# 使用工厂函数创建新实例
resolved_agent = agent_factory()
else:
# 使用现有实例
resolved_agent = target_agent
# 如果有参数,可能需要基于参数定制代理
if arguments:
resolved_agent = _customize_agent_with_arguments(resolved_agent, arguments)
return resolved_agent
# 5. 创建Handoff实例
return Handoff(
tool_name=tool_name,
tool_description=tool_description,
input_json_schema=input_schema,
on_invoke_handoff=on_invoke_handoff,
agent_name=target_agent.name,
input_filter=input_filter,
is_enabled=is_enabled,
)
def _generate_handoff_input_schema(agent: Agent) -> dict[str, Any]:
"""
为代理切换生成输入Schema
分析目标代理是否需要特殊的初始化参数
"""
# 检查代理的instructions是否为函数
if callable(agent.instructions):
# 如果instructions是函数,可能需要特定参数
sig = inspect.signature(agent.instructions)
params = [p for name, p in sig.parameters.items()
if name not in ['context', 'agent']]
if params:
# 为函数参数生成schema
return function_schema(agent.instructions, exclude_first_param=True)
# 检查是否有动态配置需求
if hasattr(agent, '_handoff_input_schema'):
return agent._handoff_input_schema
# 默认的空schema
return {
"type": "object",
"properties": {},
"additionalProperties": False,
}
def _customize_agent_with_arguments(agent: Agent, arguments: dict[str, Any]) -> Agent:
"""
基于切换参数定制代理
允许在切换时动态配置目标代理
"""
# 如果代理有定制化方法,调用它
if hasattr(agent, 'customize_for_handoff'):
return agent.customize_for_handoff(arguments)
# 检查是否需要修改instructions
if callable(agent.instructions) and arguments:
# 创建包装函数,将arguments传递给instructions函数
original_instructions = agent.instructions
def wrapped_instructions(context, agent_instance):
# 将handoff参数添加到context中
context.handoff_arguments = arguments
return original_instructions(context, agent_instance)
return agent.clone(instructions=wrapped_instructions)
return agent
7.3.3 代理切换执行流程
# 位于 src/agents/_run_impl.py
@classmethod
async def _handle_handoff(
cls,
handoff_call_item: HandoffCallItem,
context_wrapper: RunContextWrapper[Any],
hooks: RunHooks[Any],
global_handoff_filter: HandoffInputFilter | None,
) -> NextStepHandoff:
"""
处理代理切换的完整流程
执行步骤:
1. 查找匹配的切换配置
2. 调用切换处理函数
3. 应用输入过滤器
4. 执行切换钩子
5. 返回新代理
"""
handoff_name = handoff_call_item.handoff_name
handoff_arguments = handoff_call_item.arguments
# 1. 查找匹配的切换配置
matching_handoff: Handoff | None = None
current_agent = context_wrapper.current_agent
for handoff in current_agent.handoffs:
if isinstance(handoff, Handoff) and handoff.tool_name == handoff_name:
matching_handoff = handoff
break
elif isinstance(handoff, Agent) and handoff.name == handoff_name:
# 直接的Agent实例,创建临时Handoff
matching_handoff = create_temporary_handoff(handoff)
break
if not matching_handoff:
raise ModelBehaviorError(f"Handoff '{handoff_name}' not found in agent handoffs")
# 2. 检查切换是否启用
if not await _check_handoff_enabled(matching_handoff, context_wrapper, current_agent):
raise ModelBehaviorError(f"Handoff '{handoff_name}' is currently disabled")
# 3. 调用切换处理函数获取新代理
try:
new_agent = await matching_handoff.on_invoke_handoff(
context_wrapper,
handoff_arguments
)
except Exception as e:
error_msg = f"Failed to execute handoff '{handoff_name}': {str(e)}"
logger.error(error_msg, exc_info=True)
raise ModelBehaviorError(error_msg) from e
# 4. 准备切换输入数据
handoff_input_data = HandoffInputData(
input_history=context_wrapper.original_input,
pre_handoff_items=tuple(context_wrapper.pre_handoff_items),
new_items=tuple(context_wrapper.new_items),
run_context=context_wrapper,
)
# 5. 应用输入过滤器
effective_filter = matching_handoff.input_filter or global_handoff_filter
if effective_filter:
try:
filtered_data = await _apply_handoff_input_filter(
effective_filter,
handoff_input_data
)
# 更新上下文以反映过滤后的数据
context_wrapper.update_for_handoff(filtered_data)
except Exception as e:
error_msg = f"Handoff input filter failed for '{handoff_name}': {str(e)}"
logger.error(error_msg, exc_info=True)
raise ModelBehaviorError(error_msg) from e
# 6. 执行切换钩子
await asyncio.gather(
hooks.on_handoff(context_wrapper, current_agent, new_agent),
(current_agent.hooks.on_handoff(context_wrapper, current_agent, new_agent)
if current_agent.hooks else _coro.noop_coroutine()),
)
# 7. 记录切换追踪信息
with handoff_span(
from_agent=current_agent.name,
to_agent=new_agent.name,
handoff_name=handoff_name,
) as span:
span.span_data.arguments = handoff_arguments
span.span_data.filter_applied = effective_filter is not None
return NextStepHandoff(new_agent=new_agent)
async def _apply_handoff_input_filter(
filter_func: HandoffInputFilter,
input_data: HandoffInputData,
) -> HandoffInputData:
"""应用切换输入过滤器"""
# 调用过滤器函数
result = filter_func(input_data)
# 处理异步结果
if inspect.isawaitable(result):
result = await result
# 验证过滤器结果
if not isinstance(result, HandoffInputData):
raise TypeError(f"Handoff input filter must return HandoffInputData, got {type(result)}")
return result
async def _check_handoff_enabled(
handoff: Handoff,
context: RunContextWrapper[Any],
agent: Agent[Any],
) -> bool:
"""检查切换是否启用"""
is_enabled = handoff.is_enabled
if isinstance(is_enabled, bool):
return is_enabled
# 调用启用检查函数
result = is_enabled(context, agent)
if inspect.isawaitable(result):
result = await result
return bool(result)
7.3.4 切换输入过滤器系统
# 位于 src/agents/extensions/handoff_filters.py
def remove_all_tools(handoff_data: HandoffInputData) -> HandoffInputData:
"""
移除所有工具相关的项目
用于在切换时清理工具调用历史,避免上下文污染
"""
def is_tool_related(item: TResponseInputItem) -> bool:
"""判断项目是否与工具相关"""
if isinstance(item, dict):
return item.get("type") in ["tool", "tool_result", "function_call"]
# 检查其他工具相关的属性
return hasattr(item, "tool_calls") or hasattr(item, "tool_call_id")
# 过滤输入历史
if isinstance(handoff_data.input_history, tuple):
filtered_history = tuple(
item for item in handoff_data.input_history
if not is_tool_related(item)
)
else:
# 字符串类型的历史不需要过滤
filtered_history = handoff_data.input_history
# 过滤运行项目
filtered_pre_handoff = tuple(
item for item in handoff_data.pre_handoff_items
if not isinstance(item, (ToolCallItem, ToolCallOutputItem))
)
filtered_new_items = tuple(
item for item in handoff_data.new_items
if not isinstance(item, (ToolCallItem, ToolCallOutputItem))
)
return handoff_data.clone(
input_history=filtered_history,
pre_handoff_items=filtered_pre_handoff,
new_items=filtered_new_items,
)
def keep_last_n_messages(n: int) -> HandoffInputFilter:
"""
保留最后N条消息的过滤器工厂
Args:
n: 要保留的消息数量
Returns:
HandoffInputFilter: 过滤器函数
"""
def filter_func(handoff_data: HandoffInputData) -> HandoffInputData:
if isinstance(handoff_data.input_history, tuple) and len(handoff_data.input_history) > n:
filtered_history = handoff_data.input_history[-n:]
return handoff_data.clone(input_history=filtered_history)
return handoff_data
return filter_func
def remove_system_messages(handoff_data: HandoffInputData) -> HandoffInputData:
"""移除系统消息的过滤器"""
if isinstance(handoff_data.input_history, tuple):
filtered_history = tuple(
item for item in handoff_data.input_history
if not (isinstance(item, dict) and item.get("role") == "system")
)
return handoff_data.clone(input_history=filtered_history)
return handoff_data
def add_context_summary(summary: str) -> HandoffInputFilter:
"""
添加上下文摘要的过滤器工厂
在切换时为新代理提供前序上下文的摘要
"""
def filter_func(handoff_data: HandoffInputData) -> HandoffInputData:
# 创建摘要消息
summary_message = {
"role": "system",
"content": f"Context summary: {summary}",
}
if isinstance(handoff_data.input_history, tuple):
# 在历史消息前添加摘要
new_history = (summary_message,) + handoff_data.input_history
return handoff_data.clone(input_history=new_history)
else:
# 字符串历史,转换为消息列表
original_message = {
"role": "user",
"content": handoff_data.input_history,
}
new_history = (summary_message, original_message)
return handoff_data.clone(input_history=new_history)
return filter_func
7.4 Guardrails (安全防护) 深度分析
7.4.1 防护基础接口
# 位于 src/agents/guardrail.py
from abc import ABC, abstractmethod
from typing import Any, Generic, TypeVar
TContext = TypeVar("TContext")
@dataclass
class GuardrailFunctionOutput:
"""防护函数的输出结果"""
tripwire_triggered: bool # 是否触发警报
message: str # 说明消息
severity: Literal["low", "medium", "high"] = "medium" # 严重程度
metadata: dict[str, Any] = field(default_factory=dict) # 额外元数据
class InputGuardrail(ABC, Generic[TContext]):
"""输入防护的抽象基类"""
@abstractmethod
async def check(
self,
context: RunContextWrapper[TContext],
agent: Agent[TContext],
input_data: str | list[TResponseInputItem],
) -> GuardrailFunctionOutput:
"""检查输入是否安全"""
pass
@property
@abstractmethod
def name(self) -> str:
"""防护名称"""
pass
def get_name(self) -> str:
"""获取防护名称"""
return self.name
class OutputGuardrail(ABC, Generic[TContext]):
"""输出防护的抽象基类"""
@abstractmethod
async def check(
self,
context: RunContextWrapper[TContext],
agent: Agent[TContext],
output: Any,
) -> GuardrailFunctionOutput:
"""检查输出是否安全"""
pass
@property
@abstractmethod
def name(self) -> str:
"""防护名称"""
pass
def get_name(self) -> str:
"""获取防护名称"""
return self.name
@dataclass
class InputGuardrailResult:
"""输入防护检查结果"""
guardrail: InputGuardrail[Any] # 执行的防护
output: GuardrailFunctionOutput # 防护输出
execution_time: float # 执行时间
@dataclass
class OutputGuardrailResult:
"""输出防护检查结果"""
guardrail: OutputGuardrail[Any] # 执行的防护
output: GuardrailFunctionOutput # 防护输出
execution_time: float # 执行时间
7.4.2 防护装饰器实现
# 位于 src/agents/guardrail.py
def input_guardrail(
func: Callable[..., MaybeAwaitable[GuardrailFunctionOutput]] | None = None,
*,
name: str | None = None,
) -> InputGuardrail[Any] | Callable[..., InputGuardrail[Any]]:
"""
输入防护装饰器
将函数转换为输入防护检查器
支持的函数签名:
1. func(context: RunContextWrapper, agent: Agent, input_data) -> GuardrailFunctionOutput
2. func(input_data: str) -> GuardrailFunctionOutput
"""
def decorator(f: Callable[..., MaybeAwaitable[GuardrailFunctionOutput]]) -> InputGuardrail[Any]:
# 分析函数签名
sig = inspect.signature(f)
params = list(sig.parameters.values())
# 确定函数调用模式
if len(params) >= 3:
# 完整参数模式: (context, agent, input_data)
call_mode = "full"
elif len(params) == 1:
# 简单模式: (input_data)
call_mode = "simple"
else:
raise ValueError(
f"Invalid input guardrail function signature. Expected (context, agent, input_data) "
f"or (input_data), got {len(params)} parameters"
)
class FunctionInputGuardrail(InputGuardrail[Any]):
@property
def name(self) -> str:
return name or f.__name__
async def check(
self,
context: RunContextWrapper[Any],
agent: Agent[Any],
input_data: str | list[TResponseInputItem],
) -> GuardrailFunctionOutput:
"""执行防护检查"""
try:
# 根据调用模式调用函数
if call_mode == "full":
result = f(context, agent, input_data)
else: # simple
# 简单模式,将输入转换为字符串
if isinstance(input_data, list):
input_str = _convert_input_list_to_string(input_data)
else:
input_str = input_data
result = f(input_str)
# 处理异步结果
if inspect.isawaitable(result):
result = await result
# 验证结果类型
if not isinstance(result, GuardrailFunctionOutput):
raise TypeError(
f"Input guardrail function must return GuardrailFunctionOutput, "
f"got {type(result)}"
)
return result
except Exception as e:
# 防护检查出错,记录错误并返回安全结果
logger.error(f"Input guardrail '{self.name}' failed: {e}", exc_info=True)
return GuardrailFunctionOutput(
tripwire_triggered=True,
message=f"Guardrail check failed: {str(e)}",
severity="high",
metadata={"error": str(e), "error_type": type(e).__name__}
)
return FunctionInputGuardrail()
if func is not None:
return decorator(func)
else:
return decorator
def output_guardrail(
func: Callable[..., MaybeAwaitable[GuardrailFunctionOutput]] | None = None,
*,
name: str | None = None,
) -> OutputGuardrail[Any] | Callable[..., OutputGuardrail[Any]]:
"""
输出防护装饰器
将函数转换为输出防护检查器
"""
def decorator(f: Callable[..., MaybeAwaitable[GuardrailFunctionOutput]]) -> OutputGuardrail[Any]:
# 分析函数签名
sig = inspect.signature(f)
params = list(sig.parameters.values())
if len(params) >= 3:
call_mode = "full" # (context, agent, output)
elif len(params) == 1:
call_mode = "simple" # (output)
else:
raise ValueError(
f"Invalid output guardrail function signature. Expected (context, agent, output) "
f"or (output), got {len(params)} parameters"
)
class FunctionOutputGuardrail(OutputGuardrail[Any]):
@property
def name(self) -> str:
return name or f.__name__
async def check(
self,
context: RunContextWrapper[Any],
agent: Agent[Any],
output: Any,
) -> GuardrailFunctionOutput:
"""执行输出防护检查"""
try:
# 根据调用模式调用函数
if call_mode == "full":
result = f(context, agent, output)
else: # simple
result = f(output)
# 处理异步结果
if inspect.isawaitable(result):
result = await result
# 验证结果类型
if not isinstance(result, GuardrailFunctionOutput):
raise TypeError(
f"Output guardrail function must return GuardrailFunctionOutput, "
f"got {type(result)}"
)
return result
except Exception as e:
logger.error(f"Output guardrail '{self.name}' failed: {e}", exc_info=True)
return GuardrailFunctionOutput(
tripwire_triggered=True,
message=f"Output guardrail check failed: {str(e)}",
severity="high",
metadata={"error": str(e), "error_type": type(e).__name__}
)
return FunctionOutputGuardrail()
if func is not None:
return decorator(func)
else:
return decorator
def _convert_input_list_to_string(input_list: list[TResponseInputItem]) -> str:
"""将输入项目列表转换为字符串"""
text_parts = []
for item in input_list:
if isinstance(item, dict):
content = item.get("content", "")
if isinstance(content, str):
text_parts.append(content)
elif isinstance(content, list):
# 处理复杂内容
for content_item in content:
if isinstance(content_item, dict) and content_item.get("type") == "text":
text_parts.append(content_item.get("text", ""))
return "\n".join(text_parts)
7.4.3 防护执行系统
# 位于 src/agents/_run_impl.py
@classmethod
async def run_single_input_guardrail(
cls,
agent: Agent[Any],
guardrail: InputGuardrail[TContext],
input_data: str | list[TResponseInputItem],
context: RunContextWrapper[TContext],
) -> InputGuardrailResult:
"""
运行单个输入防护检查
包含完整的执行、计时和错误处理逻辑
"""
start_time = time.time()
# 开始防护追踪span
with guardrail_span(
guardrail_name=guardrail.get_name(),
guardrail_type="input",
) as span:
try:
# 执行防护检查
output = await guardrail.check(context, agent, input_data)
# 记录追踪信息
span.span_data.result = output.tripwire_triggered
span.span_data.message = output.message
span.span_data.severity = output.severity
execution_time = time.time() - start_time
return InputGuardrailResult(
guardrail=guardrail,
output=output,
execution_time=execution_time,
)
except Exception as e:
# 防护执行异常
execution_time = time.time() - start_time
error_output = GuardrailFunctionOutput(
tripwire_triggered=True,
message=f"Guardrail execution failed: {str(e)}",
severity="high",
metadata={"error": str(e), "error_type": type(e).__name__}
)
# 记录错误到追踪
_error_tracing.attach_error_to_span(
span,
SpanError(
message=f"Input guardrail '{guardrail.get_name()}' execution failed",
data={"error": str(e)}
)
)
return InputGuardrailResult(
guardrail=guardrail,
output=error_output,
execution_time=execution_time,
)
@classmethod
async def run_single_output_guardrail(
cls,
guardrail: OutputGuardrail[TContext],
agent: Agent[TContext],
output: Any,
context: RunContextWrapper[TContext],
) -> OutputGuardrailResult:
"""运行单个输出防护检查"""
start_time = time.time()
with guardrail_span(
guardrail_name=guardrail.get_name(),
guardrail_type="output",
) as span:
try:
result = await guardrail.check(context, agent, output)
span.span_data.result = result.tripwire_triggered
span.span_data.message = result.message
span.span_data.severity = result.severity
execution_time = time.time() - start_time
return OutputGuardrailResult(
guardrail=guardrail,
output=result,
execution_time=execution_time,
)
except Exception as e:
execution_time = time.time() - start_time
error_output = GuardrailFunctionOutput(
tripwire_triggered=True,
message=f"Output guardrail execution failed: {str(e)}",
severity="high",
metadata={"error": str(e)}
)
_error_tracing.attach_error_to_span(
span,
SpanError(
message=f"Output guardrail '{guardrail.get_name()}' execution failed",
data={"error": str(e)}
)
)
return OutputGuardrailResult(
guardrail=guardrail,
output=error_output,
execution_time=execution_time,
)
7.4.4 常用防护示例实现
# 示例:内容安全防护
@input_guardrail
def content_safety_check(input_data: str) -> GuardrailFunctionOutput:
"""检查输入内容是否安全"""
# 定义禁止的关键词
prohibited_keywords = [
"暴力", "仇恨", "歧视", "色情", "非法",
"violence", "hate", "discrimination", "illegal"
]
input_lower = input_data.lower()
for keyword in prohibited_keywords:
if keyword in input_lower:
return GuardrailFunctionOutput(
tripwire_triggered=True,
message=f"Input contains prohibited content: {keyword}",
severity="high",
metadata={"matched_keyword": keyword}
)
return GuardrailFunctionOutput(
tripwire_triggered=False,
message="Content safety check passed"
)
# 示例:输出长度限制防护
@output_guardrail
def output_length_limit(output: Any) -> GuardrailFunctionOutput:
"""限制输出长度"""
MAX_LENGTH = 1000
output_str = str(output)
if len(output_str) > MAX_LENGTH:
return GuardrailFunctionOutput(
tripwire_triggered=True,
message=f"Output exceeds maximum length of {MAX_LENGTH} characters",
severity="medium",
metadata={"actual_length": len(output_str), "max_length": MAX_LENGTH}
)
return GuardrailFunctionOutput(
tripwire_triggered=False,
message="Output length check passed"
)
# 示例:敏感信息检测防护
@output_guardrail
def sensitive_info_detection(
context: RunContextWrapper,
agent: Agent,
output: Any
) -> GuardrailFunctionOutput:
"""检测输出中的敏感信息"""
import re
output_str = str(output)
# 检测常见敏感信息模式
patterns = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone": r'\b\d{3}-\d{3}-\d{4}\b',
"ssn": r'\b\d{3}-\d{2}-\d{4}\b',
"credit_card": r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
}
detected_info = []
for info_type, pattern in patterns.items():
matches = re.findall(pattern, output_str)
if matches:
detected_info.append({
"type": info_type,
"count": len(matches),
"examples": matches[:2] # 只记录前两个例子
})
if detected_info:
return GuardrailFunctionOutput(
tripwire_triggered=True,
message="Output contains potentially sensitive information",
severity="high",
metadata={"detected_info": detected_info}
)
return GuardrailFunctionOutput(
tripwire_triggered=False,
message="No sensitive information detected"
)
这个代理协作层为OpenAI Agents SDK提供了强大的多代理协作能力和安全防护机制,使得复杂的多代理工作流能够安全可靠地运行。