概述
本指南基于大量生产环境实践经验,提供AutoGen框架从开发到部署的完整实战指导,包含性能优化、故障排查、监控告警等关键实践。
1. 快速开始指南
1.1 环境准备
Python环境设置
# 创建虚拟环境
python -m venv autogen-env
source autogen-env/bin/activate # Linux/Mac
# autogen-env\Scripts\activate # Windows
# 安装核心包
pip install autogen-core autogen-agentchat
# 安装扩展包
pip install autogen-ext[openai,azure,anthropic]
# 开发工具
pip install pytest pytest-asyncio black isort mypy
.NET环境设置
# 安装.NET SDK 8.0+
dotnet --version
# 创建新项目
dotnet new console -n AutoGenApp
cd AutoGenApp
# 添加AutoGen包
dotnet add package Microsoft.AutoGen.Core
dotnet add package Microsoft.AutoGen.Agents
dotnet add package Microsoft.Extensions.Hosting
1.2 第一个代理应用
Python版本
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
"""第一个AutoGen应用"""
# 1. 创建模型客户端
model_client = OpenAIChatCompletionClient(
model="gpt-4o-mini",
api_key="your-openai-api-key"
)
# 2. 创建助手代理
assistant = AssistantAgent(
name="assistant",
model_client=model_client,
description="一个有用的AI助手"
)
# 3. 运行对话
await Console(assistant.run_stream(
task="请介绍一下AutoGen框架的主要特点"
))
if __name__ == "__main__":
asyncio.run(main())
.NET版本
using Microsoft.AutoGen.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
// 1. 定义代理
public class GreetingAgent : Agent
{
public GreetingAgent(ILogger<GreetingAgent> logger) : base(logger)
{
}
[MessageHandler]
public async Task<string> HandleGreeting(string message, MessageContext context)
{
return $"你好!你说:{message}";
}
}
// 2. 配置服务
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddAutoGenCore();
builder.Services.AddAgent<GreetingAgent>();
var host = builder.Build();
// 3. 运行应用
var runtime = host.Services.GetRequiredService<IAgentRuntime>();
var agentId = new AgentId("GreetingAgent", "default");
var response = await runtime.SendMessageAsync<string>(
"Hello AutoGen!",
agentId
);
Console.WriteLine(response);
2. 核心概念实战
2.1 代理设计模式
单一职责代理
class WeatherAgent(AssistantAgent):
"""专门处理天气查询的代理"""
def __init__(self, weather_api_key: str):
super().__init__(
name="weather_agent",
model_client=OpenAIChatCompletionClient(model="gpt-4o-mini"),
tools=[self.get_weather],
description="专业的天气查询助手",
system_message="你是一个专业的天气查询助手,只回答天气相关的问题。"
)
self.weather_api_key = weather_api_key
async def get_weather(self, city: str, date: str = None) -> str:
"""
获取指定城市的天气信息
Args:
city: 城市名称
date: 日期(可选,默认今天)
Returns:
天气信息字符串
"""
# 实际实现中调用天气API
return f"{city}今天天气晴朗,温度25°C"
# 使用示例
weather_agent = WeatherAgent("your-weather-api-key")
result = await weather_agent.run(task="北京今天天气怎么样?")
组合代理模式
class TravelPlannerAgent(AssistantAgent):
"""旅行规划代理 - 组合多个专业代理"""
def __init__(self):
super().__init__(
name="travel_planner",
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
tools=[
AgentTool(WeatherAgent("weather-key")),
AgentTool(HotelAgent("hotel-key")),
AgentTool(FlightAgent("flight-key"))
],
description="专业的旅行规划助手",
system_message="""
你是一个专业的旅行规划助手。你可以:
1. 查询天气信息
2. 搜索酒店
3. 查找航班
4. 制定详细的旅行计划
请根据用户需求,合理使用各种工具来制定最佳的旅行方案。
"""
)
# 使用示例
planner = TravelPlannerAgent()
result = await planner.run(
task="我想下周去上海旅行3天,请帮我制定一个详细的计划"
)
2.2 团队协作模式
专家团队模式
async def create_expert_team():
"""创建专家团队进行协作"""
# 1. 创建各领域专家
researcher = AssistantAgent(
name="researcher",
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
description="专业的研究员,负责信息收集和分析",
system_message="你是一个专业的研究员,擅长收集和分析各种信息。"
)
writer = AssistantAgent(
name="writer",
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
description="专业的写作者,负责内容创作",
system_message="你是一个专业的写作者,擅长创作各种类型的文档。"
)
reviewer = AssistantAgent(
name="reviewer",
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
description="专业的审核者,负责质量控制",
system_message="你是一个专业的审核者,负责检查内容质量并提出改进建议。请在完成后说'TERMINATE'。"
)
# 2. 创建团队
team = RoundRobinGroupChat(
name="expert_team",
description="专家协作团队",
participants=[researcher, writer, reviewer],
termination_condition=MaxMessageTermination(15)
)
return team
# 使用示例
team = await create_expert_team()
result = await team.run(
task="写一篇关于人工智能在医疗领域应用的技术报告,要求内容准确、结构清晰、语言专业。"
)
层次化决策模式
class HierarchicalTeam:
"""层次化决策团队"""
def __init__(self):
# 操作层代理
self.operational_agents = [
self.create_data_analyst(),
self.create_market_researcher(),
self.create_technical_expert()
]
# 管理层代理
self.manager = self.create_manager()
# 决策层代理
self.executive = self.create_executive()
def create_data_analyst(self):
return AssistantAgent(
name="data_analyst",
model_client=OpenAIChatCompletionClient(model="gpt-4o-mini"),
description="数据分析专家",
system_message="你是数据分析专家,负责分析数据并提供洞察。"
)
def create_manager(self):
return AssistantAgent(
name="manager",
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
tools=[AgentTool(agent) for agent in self.operational_agents],
description="项目经理,协调各专家工作",
system_message="""
你是项目经理,负责:
1. 分析任务需求
2. 分配工作给合适的专家
3. 汇总专家意见
4. 提出初步建议
"""
)
def create_executive(self):
return AssistantAgent(
name="executive",
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
tools=[AgentTool(self.manager)],
description="高级决策者",
system_message="""
你是高级决策者,负责:
1. 审查管理层建议
2. 考虑战略因素
3. 做出最终决策
4. 说明决策理由
"""
)
async def make_decision(self, task: str):
"""执行层次化决策流程"""
return await self.executive.run(task=task)
# 使用示例
team = HierarchicalTeam()
decision = await team.make_decision(
"我们公司是否应该投资开发一个新的AI产品?请进行全面分析并给出建议。"
)
3. 性能优化实战
3.1 消息处理优化
批处理优化
class BatchProcessor:
"""批处理消息优化器"""
def __init__(self, batch_size: int = 10, timeout: float = 1.0):
self.batch_size = batch_size
self.timeout = timeout
self.message_queue = asyncio.Queue()
self.result_futures = {}
self.processing_task = None
async def process_message(self, message: Any, agent: AssistantAgent) -> Any:
"""批处理消息"""
# 创建结果Future
message_id = str(uuid.uuid4())
future = asyncio.Future()
self.result_futures[message_id] = future
# 添加到队列
await self.message_queue.put((message_id, message, agent))
# 启动处理任务(如果未启动)
if self.processing_task is None or self.processing_task.done():
self.processing_task = asyncio.create_task(self._process_batch())
# 等待结果
return await future
async def _process_batch(self):
"""批处理逻辑"""
batch = []
try:
# 收集批次消息
while len(batch) < self.batch_size:
try:
item = await asyncio.wait_for(
self.message_queue.get(),
timeout=self.timeout
)
batch.append(item)
except asyncio.TimeoutError:
break
if not batch:
return
# 按代理分组
agent_groups = {}
for message_id, message, agent in batch:
if agent not in agent_groups:
agent_groups[agent] = []
agent_groups[agent].append((message_id, message))
# 并发处理各组
tasks = [
self._process_agent_group(agent, messages)
for agent, messages in agent_groups.items()
]
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
# 处理异常,通知所有等待的Future
for message_id, _, _ in batch:
if message_id in self.result_futures:
self.result_futures[message_id].set_exception(e)
del self.result_futures[message_id]
async def _process_agent_group(self, agent: AssistantAgent, messages: List[Tuple[str, Any]]):
"""处理单个代理的消息组"""
for message_id, message in messages:
try:
# 处理单个消息
result = await agent.run(task=str(message))
# 设置结果
if message_id in self.result_futures:
self.result_futures[message_id].set_result(result)
del self.result_futures[message_id]
except Exception as e:
if message_id in self.result_futures:
self.result_futures[message_id].set_exception(e)
del self.result_futures[message_id]
# 使用示例
batch_processor = BatchProcessor(batch_size=5, timeout=0.5)
agent = AssistantAgent("assistant", model_client)
# 批量处理消息
tasks = [
batch_processor.process_message(f"问题{i}", agent)
for i in range(20)
]
results = await asyncio.gather(*tasks)
连接池优化
class ModelClientPool:
"""模型客户端连接池"""
def __init__(self, max_connections: int = 10, model_config: dict = None):
self.max_connections = max_connections
self.model_config = model_config or {}
self.available_clients = asyncio.Queue(maxsize=max_connections)
self.total_clients = 0
self.lock = asyncio.Lock()
self.stats = {
'requests': 0,
'cache_hits': 0,
'cache_misses': 0
}
async def get_client(self) -> OpenAIChatCompletionClient:
"""获取客户端连接"""
try:
# 尝试从池中获取
client = self.available_clients.get_nowait()
self.stats['cache_hits'] += 1
return client
except asyncio.QueueEmpty:
# 创建新连接
async with self.lock:
if self.total_clients < self.max_connections:
client = self._create_client()
self.total_clients += 1
self.stats['cache_misses'] += 1
return client
else:
# 等待可用连接
client = await self.available_clients.get()
self.stats['cache_hits'] += 1
return client
async def return_client(self, client: OpenAIChatCompletionClient):
"""归还客户端连接"""
try:
self.available_clients.put_nowait(client)
except asyncio.QueueFull:
# 池已满,关闭连接
await self._close_client(client)
async with self.lock:
self.total_clients -= 1
def _create_client(self) -> OpenAIChatCompletionClient:
"""创建新的客户端"""
return OpenAIChatCompletionClient(
model=self.model_config.get('model', 'gpt-4o-mini'),
api_key=self.model_config.get('api_key'),
base_url=self.model_config.get('base_url'),
max_retries=self.model_config.get('max_retries', 3),
timeout=self.model_config.get('timeout', 30.0)
)
async def _close_client(self, client: OpenAIChatCompletionClient):
"""关闭客户端连接"""
# 实现客户端清理逻辑
pass
def get_stats(self) -> dict:
"""获取连接池统计信息"""
return {
**self.stats,
'total_clients': self.total_clients,
'available_clients': self.available_clients.qsize(),
'hit_rate': self.stats['cache_hits'] / max(1, self.stats['requests'])
}
# 使用示例
client_pool = ModelClientPool(max_connections=5, model_config={
'model': 'gpt-4o-mini',
'api_key': 'your-api-key'
})
class PooledAgent(AssistantAgent):
"""使用连接池的代理"""
def __init__(self, name: str, client_pool: ModelClientPool):
self.client_pool = client_pool
super().__init__(
name=name,
model_client=None, # 将在运行时获取
description="使用连接池的高性能代理"
)
async def on_messages(self, messages, cancellation_token):
"""重写消息处理以使用连接池"""
# 获取客户端
client = await self.client_pool.get_client()
try:
# 临时设置客户端
original_client = self._model_client
self._model_client = client
# 处理消息
result = await super().on_messages(messages, cancellation_token)
return result
finally:
# 归还客户端
await self.client_pool.return_client(client)
self._model_client = original_client
3.2 内存优化
对象池化
class MessageContextPool:
"""消息上下文对象池"""
def __init__(self, max_size: int = 100):
self.max_size = max_size
self.pool = []
self.lock = threading.Lock()
self.created_count = 0
self.reused_count = 0
def get_context(self) -> MessageContext:
"""获取消息上下文对象"""
with self.lock:
if self.pool:
context = self.pool.pop()
self.reused_count += 1
return context
else:
context = MessageContext()
self.created_count += 1
return context
def return_context(self, context: MessageContext):
"""归还消息上下文对象"""
# 重置对象状态
context.reset()
with self.lock:
if len(self.pool) < self.max_size:
self.pool.append(context)
def get_stats(self) -> dict:
"""获取池统计信息"""
return {
'created_count': self.created_count,
'reused_count': self.reused_count,
'pool_size': len(self.pool),
'reuse_rate': self.reused_count / max(1, self.created_count + self.reused_count)
}
class MessageContext:
"""可重用的消息上下文"""
def __init__(self):
self.reset()
def reset(self):
"""重置对象状态"""
self.message_id = None
self.sender = None
self.recipient = None
self.timestamp = None
self.properties = {}
self.cancellation_token = None
内存监控
class MemoryMonitor:
"""内存使用监控器"""
def __init__(self, warning_threshold: float = 0.8, critical_threshold: float = 0.9):
self.warning_threshold = warning_threshold
self.critical_threshold = critical_threshold
self.callbacks = {
'warning': [],
'critical': [],
'normal': []
}
def add_callback(self, level: str, callback: Callable):
"""添加内存状态回调"""
if level in self.callbacks:
self.callbacks[level].append(callback)
def check_memory(self) -> dict:
"""检查内存使用情况"""
# 获取系统内存信息
memory_info = psutil.virtual_memory()
usage_percent = memory_info.percent / 100.0
# 获取进程内存信息
process = psutil.Process()
process_memory = process.memory_info()
status = {
'system_usage_percent': usage_percent,
'process_memory_mb': process_memory.rss / 1024 / 1024,
'available_memory_mb': memory_info.available / 1024 / 1024,
'level': 'normal'
}
# 确定内存状态级别
if usage_percent >= self.critical_threshold:
status['level'] = 'critical'
elif usage_percent >= self.warning_threshold:
status['level'] = 'warning'
# 触发回调
for callback in self.callbacks[status['level']]:
try:
callback(status)
except Exception as e:
logger.error(f"内存监控回调失败: {e}")
return status
async def start_monitoring(self, interval: float = 30.0):
"""启动内存监控"""
while True:
try:
self.check_memory()
await asyncio.sleep(interval)
except Exception as e:
logger.error(f"内存监控异常: {e}")
await asyncio.sleep(interval)
# 使用示例
memory_monitor = MemoryMonitor()
def on_memory_warning(status):
logger.warning(f"内存使用率过高: {status['system_usage_percent']:.1%}")
# 触发垃圾回收
gc.collect()
def on_memory_critical(status):
logger.critical(f"内存使用率危险: {status['system_usage_percent']:.1%}")
# 清理缓存
clear_caches()
# 限制新请求
enable_backpressure()
memory_monitor.add_callback('warning', on_memory_warning)
memory_monitor.add_callback('critical', on_memory_critical)
# 启动监控
asyncio.create_task(memory_monitor.start_monitoring())
4. 生产部署实战
4.1 容器化部署
Dockerfile最佳实践
# 多阶段构建 - Python版本
FROM python:3.11-slim as builder
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir --user -r requirements.txt
# 生产镜像
FROM python:3.11-slim
# 创建非root用户
RUN groupadd -r autogen && useradd -r -g autogen autogen
# 设置工作目录
WORKDIR /app
# 从builder阶段复制依赖
COPY --from=builder /root/.local /home/autogen/.local
# 复制应用代码
COPY --chown=autogen:autogen . .
# 设置环境变量
ENV PATH=/home/autogen/.local/bin:$PATH
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')" || exit 1
# 切换到非root用户
USER autogen
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose配置
version: '3.8'
services:
# AutoGen应用
autogen-app:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://postgres:password@postgres:5432/autogen
depends_on:
- redis
- postgres
volumes:
- ./logs:/app/logs
restart: unless-stopped
deploy:
resources:
limits:
memory: 1G
cpus: '0.5'
reservations:
memory: 512M
cpus: '0.25'
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
# Redis缓存
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
restart: unless-stopped
# PostgreSQL数据库
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_DB=autogen
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
restart: unless-stopped
# Prometheus监控
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
restart: unless-stopped
# Grafana可视化
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
- ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./monitoring/grafana/datasources:/etc/grafana/provisioning/datasources
restart: unless-stopped
volumes:
redis_data:
postgres_data:
prometheus_data:
grafana_data:
4.2 Kubernetes部署
部署配置
# autogen-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: autogen-app
labels:
app: autogen-app
spec:
replicas: 3
selector:
matchLabels:
app: autogen-app
template:
metadata:
labels:
app: autogen-app
spec:
containers:
- name: autogen-app
image: autogen-app:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: autogen-secrets
key: openai-api-key
- name: REDIS_URL
value: "redis://redis-service:6379"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: autogen-secrets
key: database-url
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
volumeMounts:
- name: logs
mountPath: /app/logs
volumes:
- name: logs
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: autogen-service
spec:
selector:
app: autogen-app
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: autogen-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: autogen-app
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
配置管理
# autogen-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: autogen-config
data:
app.yaml: |
server:
host: "0.0.0.0"
port: 8000
workers: 4
logging:
level: "INFO"
format: "json"
cache:
type: "redis"
ttl: 3600
monitoring:
enabled: true
metrics_port: 9090
agents:
max_concurrent: 100
timeout: 30
retry_attempts: 3
---
apiVersion: v1
kind: Secret
metadata:
name: autogen-secrets
type: Opaque
stringData:
openai-api-key: "your-openai-api-key"
database-url: "postgresql://user:pass@postgres:5432/autogen"
redis-password: "your-redis-password"
4.3 监控和告警
Prometheus配置
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "autogen_rules.yml"
scrape_configs:
- job_name: 'autogen-app'
static_configs:
- targets: ['autogen-service:9090']
metrics_path: /metrics
scrape_interval: 10s
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']
- job_name: 'postgres'
static_configs:
- targets: ['postgres-exporter:9187']
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
告警规则
# autogen_rules.yml
groups:
- name: autogen_alerts
rules:
# 高错误率告警
- alert: HighErrorRate
expr: rate(autogen_requests_failed_total[5m]) / rate(autogen_requests_total[5m]) > 0.05
for: 2m
labels:
severity: warning
annotations:
summary: "AutoGen应用错误率过高"
description: "错误率 {{ $value | humanizePercentage }} 超过5%"
# 高响应时间告警
- alert: HighResponseTime
expr: histogram_quantile(0.95, rate(autogen_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "AutoGen应用响应时间过长"
description: "95%分位响应时间 {{ $value }}s 超过2秒"
# 内存使用率告警
- alert: HighMemoryUsage
expr: (process_resident_memory_bytes / node_memory_MemTotal_bytes) > 0.8
for: 5m
labels:
severity: critical
annotations:
summary: "AutoGen应用内存使用率过高"
description: "内存使用率 {{ $value | humanizePercentage }} 超过80%"
# 代理离线告警
- alert: AgentOffline
expr: autogen_agents_count{status="offline"} > 0
for: 1m
labels:
severity: warning
annotations:
summary: "有代理离线"
description: "{{ $value }}个代理处于离线状态"
Grafana仪表板
{
"dashboard": {
"title": "AutoGen监控仪表板",
"panels": [
{
"title": "请求QPS",
"type": "graph",
"targets": [
{
"expr": "rate(autogen_requests_total[1m])",
"legendFormat": "{{agent_type}}"
}
]
},
{
"title": "响应时间分布",
"type": "heatmap",
"targets": [
{
"expr": "rate(autogen_request_duration_seconds_bucket[1m])",
"format": "heatmap"
}
]
},
{
"title": "错误率",
"type": "stat",
"targets": [
{
"expr": "rate(autogen_requests_failed_total[5m]) / rate(autogen_requests_total[5m])",
"legendFormat": "错误率"
}
]
},
{
"title": "活跃代理数",
"type": "stat",
"targets": [
{
"expr": "sum(autogen_agents_count{status=\"online\"})",
"legendFormat": "在线代理"
}
]
}
]
}
}
5. 故障排查指南
5.1 常见问题诊断
性能问题排查
class PerformanceDiagnostics:
"""性能诊断工具"""
def __init__(self):
self.metrics = {}
self.profiler = None
async def diagnose_slow_response(self, agent: AssistantAgent, test_message: str):
"""诊断响应缓慢问题"""
print("🔍 开始性能诊断...")
# 1. 基础性能测试
start_time = time.time()
result = await agent.run(task=test_message)
total_time = time.time() - start_time
print(f"📊 总响应时间: {total_time:.2f}秒")
# 2. 分阶段计时
timings = await self._detailed_timing_analysis(agent, test_message)
print("⏱️ 详细计时分析:")
for stage, duration in timings.items():
percentage = (duration / total_time) * 100
print(f" {stage}: {duration:.2f}s ({percentage:.1f}%)")
# 3. 资源使用分析
resource_usage = self._analyze_resource_usage()
print(f"💾 内存使用: {resource_usage['memory_mb']:.1f}MB")
print(f"🔥 CPU使用: {resource_usage['cpu_percent']:.1f}%")
# 4. 生成优化建议
suggestions = self._generate_optimization_suggestions(timings, resource_usage)
print("\n💡 优化建议:")
for suggestion in suggestions:
print(f" • {suggestion}")
return {
'total_time': total_time,
'timings': timings,
'resource_usage': resource_usage,
'suggestions': suggestions
}
async def _detailed_timing_analysis(self, agent: AssistantAgent, message: str):
"""详细的计时分析"""
timings = {}
# 模拟各阶段计时
start = time.time()
# 消息预处理
await asyncio.sleep(0.01) # 模拟预处理时间
timings['message_preprocessing'] = time.time() - start
# 模型调用
start = time.time()
await asyncio.sleep(0.5) # 模拟模型调用时间
timings['model_inference'] = time.time() - start
# 工具调用(如果有)
start = time.time()
await asyncio.sleep(0.1) # 模拟工具调用时间
timings['tool_execution'] = time.time() - start
# 响应后处理
start = time.time()
await asyncio.sleep(0.01) # 模拟后处理时间
timings['response_postprocessing'] = time.time() - start
return timings
def _analyze_resource_usage(self):
"""分析资源使用情况"""
process = psutil.Process()
return {
'memory_mb': process.memory_info().rss / 1024 / 1024,
'cpu_percent': process.cpu_percent(),
'thread_count': process.num_threads(),
'open_files': len(process.open_files())
}
def _generate_optimization_suggestions(self, timings, resource_usage):
"""生成优化建议"""
suggestions = []
# 基于计时分析的建议
if timings.get('model_inference', 0) > 1.0:
suggestions.append("考虑使用更快的模型或启用模型缓存")
if timings.get('tool_execution', 0) > 0.5:
suggestions.append("优化工具执行逻辑或使用工具缓存")
# 基于资源使用的建议
if resource_usage['memory_mb'] > 500:
suggestions.append("内存使用较高,考虑启用对象池化")
if resource_usage['cpu_percent'] > 80:
suggestions.append("CPU使用率高,考虑启用异步处理")
if resource_usage['thread_count'] > 50:
suggestions.append("线程数过多,检查是否有线程泄漏")
return suggestions
# 使用示例
diagnostics = PerformanceDiagnostics()
agent = AssistantAgent("test_agent", model_client)
# 运行诊断
diagnosis = await diagnostics.diagnose_slow_response(
agent,
"请分析一下人工智能的发展趋势"
)
内存泄漏检测
import tracemalloc
import gc
from collections import defaultdict
class MemoryLeakDetector:
"""内存泄漏检测器"""
def __init__(self):
self.snapshots = []
self.tracking = False
def start_tracking(self):
"""开始内存追踪"""
tracemalloc.start()
self.tracking = True
self.take_snapshot("baseline")
print("🔍 开始内存泄漏检测...")
def take_snapshot(self, label: str):
"""拍摄内存快照"""
if not self.tracking:
return
# 强制垃圾回收
gc.collect()
snapshot = tracemalloc.take_snapshot()
self.snapshots.append({
'label': label,
'snapshot': snapshot,
'timestamp': time.time()
})
print(f"📸 拍摄内存快照: {label}")
def analyze_leaks(self, top_n: int = 10):
"""分析内存泄漏"""
if len(self.snapshots) < 2:
print("❌ 需要至少2个快照才能分析泄漏")
return
print("\n🔍 内存泄漏分析报告:")
print("=" * 50)
# 比较最新和最旧的快照
first_snapshot = self.snapshots[0]['snapshot']
last_snapshot = self.snapshots[-1]['snapshot']
# 计算内存增长
top_stats = last_snapshot.compare_to(first_snapshot, 'lineno')
print(f"📊 内存增长最多的 {top_n} 个位置:")
for index, stat in enumerate(top_stats[:top_n], 1):
print(f"{index}. {stat}")
# 分析对象类型增长
self._analyze_object_growth()
# 生成修复建议
suggestions = self._generate_leak_fix_suggestions(top_stats)
print("\n💡 修复建议:")
for suggestion in suggestions:
print(f" • {suggestion}")
def _analyze_object_growth(self):
"""分析对象类型增长"""
print("\n📈 对象类型增长分析:")
# 获取当前对象统计
obj_stats = defaultdict(int)
for obj in gc.get_objects():
obj_type = type(obj).__name__
obj_stats[obj_type] += 1
# 显示最多的对象类型
sorted_stats = sorted(obj_stats.items(), key=lambda x: x[1], reverse=True)
for obj_type, count in sorted_stats[:10]:
print(f" {obj_type}: {count}")
def _generate_leak_fix_suggestions(self, top_stats):
"""生成泄漏修复建议"""
suggestions = []
for stat in top_stats[:5]:
filename = stat.traceback.format()[-1]
if 'asyncio' in filename:
suggestions.append("检查异步任务是否正确清理,避免任务泄漏")
elif 'cache' in filename.lower():
suggestions.append("检查缓存是否有TTL设置,避免缓存无限增长")
elif 'list' in str(stat) or 'dict' in str(stat):
suggestions.append("检查容器对象是否及时清理,避免引用累积")
elif 'agent' in filename.lower():
suggestions.append("检查代理对象是否正确释放,避免代理实例泄漏")
if not suggestions:
suggestions.append("内存增长可能是正常的业务增长,继续监控")
return suggestions
def stop_tracking(self):
"""停止内存追踪"""
if self.tracking:
tracemalloc.stop()
self.tracking = False
print("⏹️ 停止内存泄漏检测")
# 使用示例
leak_detector = MemoryLeakDetector()
# 开始检测
leak_detector.start_tracking()
# 运行一些可能导致内存泄漏的操作
for i in range(100):
agent = AssistantAgent(f"agent_{i}", model_client)
# 模拟一些操作
await agent.run(task="简单测试")
if i % 20 == 0:
leak_detector.take_snapshot(f"iteration_{i}")
# 分析结果
leak_detector.analyze_leaks()
leak_detector.stop_tracking()
5.2 日志分析工具
import re
from datetime import datetime, timedelta
from collections import Counter, defaultdict
class LogAnalyzer:
"""日志分析工具"""
def __init__(self, log_file: str):
self.log_file = log_file
self.patterns = {
'error': re.compile(r'ERROR.*?(\w+Error|Exception).*?$', re.MULTILINE),
'warning': re.compile(r'WARNING.*?$', re.MULTILINE),
'performance': re.compile(r'处理时间.*?(\d+\.?\d*).*?ms', re.MULTILINE),
'agent': re.compile(r'代理.*?(\w+).*?(成功|失败)', re.MULTILINE),
'timestamp': re.compile(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})')
}
def analyze_errors(self, hours: int = 24):
"""分析错误日志"""
print(f"🔍 分析最近 {hours} 小时的错误日志...")
with open(self.log_file, 'r', encoding='utf-8') as f:
content = f.read()
# 提取错误信息
errors = self.patterns['error'].findall(content)
error_counter = Counter(errors)
print(f"📊 发现 {len(errors)} 个错误:")
for error_type, count in error_counter.most_common(10):
print(f" {error_type}: {count} 次")
# 分析错误趋势
self._analyze_error_trends(content)
return error_counter
def analyze_performance(self):
"""分析性能日志"""
print("⚡ 分析性能指标...")
with open(self.log_file, 'r', encoding='utf-8') as f:
content = f.read()
# 提取处理时间
times = [float(t) for t in self.patterns['performance'].findall(content)]
if times:
avg_time = sum(times) / len(times)
max_time = max(times)
min_time = min(times)
print(f"📈 性能统计:")
print(f" 平均处理时间: {avg_time:.2f}ms")
print(f" 最大处理时间: {max_time:.2f}ms")
print(f" 最小处理时间: {min_time:.2f}ms")
print(f" 总请求数: {len(times)}")
# 分析慢请求
slow_requests = [t for t in times if t > avg_time * 2]
if slow_requests:
print(f" 慢请求数: {len(slow_requests)} ({len(slow_requests)/len(times)*100:.1f}%)")
return times
def analyze_agent_activity(self):
"""分析代理活动"""
print("🤖 分析代理活动...")
with open(self.log_file, 'r', encoding='utf-8') as f:
content = f.read()
# 提取代理活动
activities = self.patterns['agent'].findall(content)
agent_stats = defaultdict(lambda: {'成功': 0, '失败': 0})
for agent, status in activities:
agent_stats[agent][status] += 1
print("📊 代理活动统计:")
for agent, stats in agent_stats.items():
total = stats['成功'] + stats['失败']
success_rate = stats['成功'] / total * 100 if total > 0 else 0
print(f" {agent}: 成功 {stats['成功']}, 失败 {stats['失败']}, 成功率 {success_rate:.1f}%")
return agent_stats
def _analyze_error_trends(self, content: str):
"""分析错误趋势"""
# 按小时统计错误
hourly_errors = defaultdict(int)
lines = content.split('\n')
for line in lines:
if 'ERROR' in line:
timestamp_match = self.patterns['timestamp'].search(line)
if timestamp_match:
timestamp_str = timestamp_match.group(1)
try:
dt = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
hour_key = dt.strftime('%Y-%m-%d %H:00')
hourly_errors[hour_key] += 1
except ValueError:
continue
if hourly_errors:
print("\n📈 错误趋势 (按小时):")
sorted_hours = sorted(hourly_errors.items())
for hour, count in sorted_hours[-24:]: # 最近24小时
print(f" {hour}: {count} 个错误")
def generate_report(self):
"""生成综合分析报告"""
print("📋 生成日志分析报告...")
print("=" * 60)
# 错误分析
errors = self.analyze_errors()
print()
# 性能分析
performance = self.analyze_performance()
print()
# 代理活动分析
agents = self.analyze_agent_activity()
print()
# 生成建议
suggestions = self._generate_suggestions(errors, performance, agents)
print("💡 优化建议:")
for suggestion in suggestions:
print(f" • {suggestion}")
return {
'errors': errors,
'performance': performance,
'agents': agents,
'suggestions': suggestions
}
def _generate_suggestions(self, errors, performance, agents):
"""生成优化建议"""
suggestions = []
# 基于错误分析的建议
if errors:
most_common_error = errors.most_common(1)[0]
if 'TimeoutError' in most_common_error[0]:
suggestions.append("超时错误较多,考虑增加超时时间或优化处理逻辑")
elif 'ConnectionError' in most_common_error[0]:
suggestions.append("连接错误较多,检查网络连接和服务可用性")
# 基于性能分析的建议
if performance:
avg_time = sum(performance) / len(performance)
if avg_time > 1000: # 超过1秒
suggestions.append("平均响应时间较长,考虑性能优化")
# 基于代理活动的建议
for agent, stats in agents.items():
total = stats['成功'] + stats['失败']
if total > 0:
success_rate = stats['成功'] / total
if success_rate < 0.9: # 成功率低于90%
suggestions.append(f"代理 {agent} 成功率较低,需要检查和优化")
if not suggestions:
suggestions.append("系统运行良好,继续监控")
return suggestions
# 使用示例
analyzer = LogAnalyzer('/app/logs/autogen.log')
report = analyzer.generate_report()
6. 总结
本实战指南涵盖了AutoGen框架从开发到生产部署的完整流程,包括:
6.1 核心要点
- 架构设计:遵循单一职责原则,合理设计代理和团队结构
- 性能优化:使用批处理、连接池、对象池等技术提升性能
- 内存管理:实施内存监控和泄漏检测,确保系统稳定
- 容器化部署:使用Docker和Kubernetes实现可扩展部署
- 监控告警:建立完善的监控体系,及时发现和解决问题
- 故障排查:掌握常见问题的诊断和解决方法
6.2 最佳实践总结
- 开发阶段:注重代码质量,编写充分的测试
- 测试阶段:进行性能测试和压力测试
- 部署阶段:使用渐进式部署策略
- 运维阶段:建立完善的监控和告警机制
- 优化阶段:基于监控数据持续优化性能
通过遵循这些实战经验和最佳实践,可以构建出高性能、高可用的AutoGen应用系统。
<function_calls>