概述

AutoGen服务网关是分布式多代理系统的核心组件,负责代理的注册发现、消息路由转发和集群管理。本文档深入分析其架构设计、gRPC通信机制和关键实现细节。

1. 服务网关整体架构

1.1 架构设计图

graph TB
    subgraph "AutoGen 服务网关架构"
        subgraph "网关服务层 (Gateway Service Layer)"
            GS[GatewayService - 网关核心服务]
            AR[AgentRegistry - 代理注册中心]
            MR[MessageRouter - 消息路由器]
            LB[LoadBalancer - 负载均衡器]
        end
        
        subgraph "通信层 (Communication Layer)"
            GRPC[gRPC Server - gRPC服务器]
            WS[WebSocket Server - WebSocket服务器]
            HTTP[HTTP API - REST接口]
            MQTT[MQTT Broker - MQTT代理]
        end
        
        subgraph "工作节点层 (Worker Node Layer)"
            WN1[WorkerNode-1 - 工作节点1]
            WN2[WorkerNode-2 - 工作节点2]
            WN3[WorkerNode-3 - 工作节点3]
            WNX[WorkerNode-N - 工作节点N]
        end
        
        subgraph "存储层 (Storage Layer)"
            REDIS[Redis - 缓存和会话]
            ETCD[etcd - 配置和发现]
            DB[Database - 持久化存储]
            MQ[MessageQueue - 消息队列]
        end
        
        subgraph "监控层 (Monitoring Layer)"
            PROM[Prometheus - 指标收集]
            GRAF[Grafana - 可视化]
            JAEGER[Jaeger - 链路追踪]
            LOG[Logging - 日志聚合]
        end
        
        subgraph "客户端层 (Client Layer)"
            PY[Python Client - Python客户端]
            NET[.NET Client - .NET客户端]
            WEB[Web Client - Web客户端]
            CLI[CLI Client - 命令行客户端]
        end
    end
    
    %% 连接关系
    GS --> AR
    GS --> MR
    GS --> LB
    
    GRPC --> GS
    WS --> GS
    HTTP --> GS
    
    GS --> WN1
    GS --> WN2
    GS --> WN3
    GS --> WNX
    
    AR --> REDIS
    AR --> ETCD
    MR --> MQ
    
    GS --> PROM
    GS --> JAEGER
    
    PY --> GRPC
    NET --> GRPC
    WEB --> WS
    CLI --> HTTP
    
    style GS fill:#e1f5fe
    style GRPC fill:#f3e5f5
    style AR fill:#e8f5e8
    style REDIS fill:#fff3e0

1.2 核心API入口分析

1. GatewayService.RegisterAgentAsync() - 代理注册

入口函数

public async Task<RegisterAgentResponse> RegisterAgentAsync(
    RegisterAgentRequest request,
    ServerCallContext context)
{
    /*
     * 代理注册的核心API
     * 
     * 功能说明:
     * 1. 验证代理注册请求的合法性
     * 2. 分配唯一的代理标识符
     * 3. 注册代理到服务发现中心
     * 4. 建立代理的健康检查机制
     * 5. 配置代理的路由规则
     * 
     * 设计特点:
     * - 幂等性:重复注册返回相同结果
     * - 原子性:注册过程要么全部成功要么全部失败
     * - 一致性:确保所有网关节点的代理信息一致
     * - 可用性:支持代理的优雅上线和下线
     */
    
    using var activity = ActivitySource.StartActivity("Gateway.RegisterAgent");
    activity?.SetTag("agent.type", request.AgentType);
    activity?.SetTag("worker.node", request.WorkerNodeId);
    
    try
    {
        // 1. 验证注册请求
        var validationResult = await ValidateRegistrationRequest(request);
        if (!validationResult.IsValid)
        {
            return new RegisterAgentResponse
            {
                Success = false,
                ErrorMessage = validationResult.ErrorMessage,
                ErrorCode = "VALIDATION_FAILED"
            };
        }
        
        // 2. 生成代理标识符
        var agentId = new AgentId
        {
            Type = request.AgentType,
            Key = request.AgentKey ?? Guid.NewGuid().ToString(),
            WorkerNodeId = request.WorkerNodeId
        };
        
        // 3. 检查代理是否已存在
        var existingAgent = await _agentRegistry.GetAgentAsync(agentId);
        if (existingAgent != null)
        {
            // 幂等性处理:返回已存在的代理信息
            if (existingAgent.WorkerNodeId == request.WorkerNodeId)
            {
                return new RegisterAgentResponse
                {
                    Success = true,
                    AgentId = agentId.ToString(),
                    Message = "代理已存在"
                };
            }
            else
            {
                // 代理ID冲突
                return new RegisterAgentResponse
                {
                    Success = false,
                    ErrorMessage = "代理ID已被其他工作节点使用",
                    ErrorCode = "AGENT_ID_CONFLICT"
                };
            }
        }
        
        // 4. 创建代理注册信息
        var agentRegistration = new AgentRegistration
        {
            AgentId = agentId,
            AgentType = request.AgentType,
            WorkerNodeId = request.WorkerNodeId,
            Capabilities = request.Capabilities.ToList(),
            Metadata = request.Metadata.ToDictionary(kv => kv.Key, kv => kv.Value),
            RegisteredAt = DateTimeOffset.UtcNow,
            LastHeartbeat = DateTimeOffset.UtcNow,
            Status = AgentStatus.Online,
            Version = request.Version
        };
        
        // 5. 注册到服务发现中心
        await _agentRegistry.RegisterAgentAsync(agentRegistration);
        
        // 6. 配置消息路由规则
        await _messageRouter.ConfigureRoutingRules(agentId, request.RoutingRules);
        
        // 7. 启动健康检查
        await _healthChecker.StartHealthCheck(agentId, request.HealthCheckConfig);
        
        // 8. 通知其他网关节点
        await _clusterManager.NotifyAgentRegistered(agentRegistration);
        
        // 9. 记录注册事件
        _logger.LogInformation("代理注册成功: {AgentId} 在工作节点 {WorkerNodeId}", 
            agentId, request.WorkerNodeId);
        
        // 10. 更新指标
        _metrics.IncrementAgentRegistrations(request.AgentType);
        
        return new RegisterAgentResponse
        {
            Success = true,
            AgentId = agentId.ToString(),
            GatewayEndpoint = _configuration.GatewayEndpoint,
            HeartbeatInterval = _configuration.HeartbeatInterval,
            Message = "代理注册成功"
        };
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "代理注册失败: {AgentType}", request.AgentType);
        
        return new RegisterAgentResponse
        {
            Success = false,
            ErrorMessage = "内部服务器错误",
            ErrorCode = "INTERNAL_ERROR"
        };
    }
}

调用链路关键函数

  1. AgentRegistry.RegisterAgentAsync() - 代理注册中心
public async Task RegisterAgentAsync(AgentRegistration registration)
{
    /*
     * 代理注册中心的核心实现
     * 
     * 功能说明:
     * 1. 将代理信息持久化到存储系统
     * 2. 更新内存缓存以提高查询性能
     * 3. 触发代理注册事件通知
     * 4. 维护代理的索引和统计信息
     * 
     * 存储策略:
     * - 主存储:etcd(强一致性,用于集群同步)
     * - 缓存:Redis(高性能,用于快速查询)
     * - 本地缓存:内存(最快访问,用于热点数据)
     */
    
    var agentId = registration.AgentId;
    
    try
    {
        // 1. 开始分布式事务
        using var transaction = await _distributedTransaction.BeginAsync();
        
        // 2. 持久化到etcd(主存储)
        var etcdKey = $"/agents/{agentId.Type}/{agentId.Key}";
        var etcdValue = JsonSerializer.Serialize(registration);
        
        await _etcdClient.PutAsync(etcdKey, etcdValue);
        
        // 3. 更新Redis缓存
        var redisKey = $"agent:{agentId}";
        var redisValue = JsonSerializer.Serialize(registration);
        
        await _redisDatabase.StringSetAsync(redisKey, redisValue, 
            TimeSpan.FromMinutes(_configuration.AgentCacheExpiration));
        
        // 4. 更新本地缓存
        _localCache.Set(agentId.ToString(), registration, 
            TimeSpan.FromMinutes(_configuration.LocalCacheExpiration));
        
        // 5. 更新索引
        await UpdateAgentIndexes(registration);
        
        // 6. 提交事务
        await transaction.CommitAsync();
        
        // 7. 触发注册事件
        await _eventBus.PublishAsync(new AgentRegisteredEvent
        {
            AgentId = agentId,
            Registration = registration,
            Timestamp = DateTimeOffset.UtcNow
        });
        
        _logger.LogDebug("代理注册到存储系统: {AgentId}", agentId);
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "代理注册失败: {AgentId}", agentId);
        throw new AgentRegistrationException($"无法注册代理 {agentId}", ex);
    }
}

private async Task UpdateAgentIndexes(AgentRegistration registration)
{
    /*
     * 更新代理索引的实现
     * 
     * 索引类型:
     * 1. 按类型索引:快速查找同类型代理
     * 2. 按工作节点索引:快速查找节点上的代理
     * 3. 按能力索引:根据能力匹配代理
     * 4. 按状态索引:快速过滤在线/离线代理
     */
    
    var agentId = registration.AgentId;
    
    // 1. 更新类型索引
    var typeIndexKey = $"index:type:{registration.AgentType}";
    await _redisDatabase.SetAddAsync(typeIndexKey, agentId.ToString());
    
    // 2. 更新工作节点索引
    var nodeIndexKey = $"index:node:{registration.WorkerNodeId}";
    await _redisDatabase.SetAddAsync(nodeIndexKey, agentId.ToString());
    
    // 3. 更新能力索引
    foreach (var capability in registration.Capabilities)
    {
        var capabilityIndexKey = $"index:capability:{capability}";
        await _redisDatabase.SetAddAsync(capabilityIndexKey, agentId.ToString());
    }
    
    // 4. 更新状态索引
    var statusIndexKey = $"index:status:{registration.Status}";
    await _redisDatabase.SetAddAsync(statusIndexKey, agentId.ToString());
    
    // 5. 更新统计信息
    await _redisDatabase.HashIncrementAsync("stats:agents", registration.AgentType, 1);
    await _redisDatabase.HashIncrementAsync("stats:nodes", registration.WorkerNodeId, 1);
}
  1. MessageRouter.RouteMessageAsync() - 消息路由
public async Task<RouteMessageResponse> RouteMessageAsync(RouteMessageRequest request)
{
    /*
     * 消息路由的核心实现
     * 
     * 功能说明:
     * 1. 解析目标代理标识符
     * 2. 查找代理的当前位置
     * 3. 选择最优的路由路径
     * 4. 转发消息到目标工作节点
     * 5. 处理路由失败和重试
     * 
     * 路由策略:
     * - 直连路由:目标代理在本地节点
     * - 转发路由:目标代理在远程节点
     * - 广播路由:消息需要发送给多个代理
     * - 负载均衡路由:在多个相同代理间选择
     */
    
    using var activity = ActivitySource.StartActivity("MessageRouter.RouteMessage");
    activity?.SetTag("message.type", request.MessageType);
    activity?.SetTag("target.agent", request.TargetAgentId);
    
    try
    {
        // 1. 解析目标代理ID
        if (!AgentId.TryParse(request.TargetAgentId, out var targetAgentId))
        {
            return new RouteMessageResponse
            {
                Success = false,
                ErrorCode = "INVALID_AGENT_ID",
                ErrorMessage = "无效的代理标识符"
            };
        }
        
        // 2. 查找目标代理
        var targetAgent = await _agentRegistry.GetAgentAsync(targetAgentId);
        if (targetAgent == null)
        {
            // 尝试通过负载均衡查找同类型代理
            var alternativeAgent = await _loadBalancer.SelectAgentAsync(
                targetAgentId.Type, request.LoadBalancingStrategy);
            
            if (alternativeAgent == null)
            {
                return new RouteMessageResponse
                {
                    Success = false,
                    ErrorCode = "AGENT_NOT_FOUND",
                    ErrorMessage = $"未找到代理: {targetAgentId}"
                };
            }
            
            targetAgent = alternativeAgent;
            targetAgentId = alternativeAgent.AgentId;
        }
        
        // 3. 检查代理状态
        if (targetAgent.Status != AgentStatus.Online)
        {
            return new RouteMessageResponse
            {
                Success = false,
                ErrorCode = "AGENT_OFFLINE",
                ErrorMessage = $"目标代理离线: {targetAgentId}"
            };
        }
        
        // 4. 选择路由策略
        var routingStrategy = DetermineRoutingStrategy(targetAgent, request);
        
        // 5. 执行消息路由
        var routingResult = await ExecuteRouting(routingStrategy, targetAgent, request);
        
        // 6. 记录路由指标
        _metrics.RecordMessageRouted(
            request.MessageType,
            targetAgent.WorkerNodeId,
            routingResult.Success,
            routingResult.Duration);
        
        return routingResult;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "消息路由失败: {TargetAgent}", request.TargetAgentId);
        
        return new RouteMessageResponse
        {
            Success = false,
            ErrorCode = "ROUTING_ERROR",
            ErrorMessage = "消息路由失败"
        };
    }
}

private async Task<RouteMessageResponse> ExecuteRouting(
    RoutingStrategy strategy,
    AgentRegistration targetAgent,
    RouteMessageRequest request)
{
    /*
     * 执行具体的路由策略
     * 
     * 路由策略实现:
     * 1. 直连路由:本地代理直接调用
     * 2. gRPC路由:通过gRPC转发到远程节点
     * 3. 消息队列路由:通过MQ异步传递
     * 4. WebSocket路由:通过WebSocket实时传递
     */
    
    switch (strategy)
    {
        case RoutingStrategy.Direct:
            return await ExecuteDirectRouting(targetAgent, request);
        
        case RoutingStrategy.GrpcForward:
            return await ExecuteGrpcRouting(targetAgent, request);
        
        case RoutingStrategy.MessageQueue:
            return await ExecuteMessageQueueRouting(targetAgent, request);
        
        case RoutingStrategy.WebSocket:
            return await ExecuteWebSocketRouting(targetAgent, request);
        
        default:
            throw new NotSupportedException($"不支持的路由策略: {strategy}");
    }
}

private async Task<RouteMessageResponse> ExecuteGrpcRouting(
    AgentRegistration targetAgent,
    RouteMessageRequest request)
{
    /*
     * gRPC路由的具体实现
     * 
     * 功能说明:
     * 1. 获取目标工作节点的gRPC连接
     * 2. 创建转发请求
     * 3. 调用远程工作节点的API
     * 4. 处理响应和异常
     * 
     * 优化技术:
     * - 连接池:复用gRPC连接
     * - 负载均衡:在多个节点间分散负载
     * - 熔断器:防止级联失败
     * - 重试机制:处理临时网络问题
     */
    
    var workerNodeId = targetAgent.WorkerNodeId;
    
    // 1. 获取工作节点连接
    var workerConnection = await _connectionManager.GetConnectionAsync(workerNodeId);
    if (workerConnection == null)
    {
        return new RouteMessageResponse
        {
            Success = false,
            ErrorCode = "WORKER_NODE_UNAVAILABLE",
            ErrorMessage = $"工作节点不可用: {workerNodeId}"
        };
    }
    
    // 2. 创建gRPC客户端
    var client = new WorkerNodeService.WorkerNodeServiceClient(workerConnection.Channel);
    
    // 3. 准备转发请求
    var forwardRequest = new ForwardMessageRequest
    {
        TargetAgentId = targetAgent.AgentId.ToString(),
        MessageType = request.MessageType,
        MessageData = request.MessageData,
        CorrelationId = request.CorrelationId,
        Timeout = request.Timeout,
        Metadata = { request.Metadata }
    };
    
    // 4. 设置调用选项
    var callOptions = new CallOptions(
        deadline: DateTime.UtcNow.AddMilliseconds(request.Timeout),
        cancellationToken: request.CancellationToken);
    
    try
    {
        // 5. 执行远程调用
        var response = await client.ForwardMessageAsync(forwardRequest, callOptions);
        
        // 6. 转换响应格式
        return new RouteMessageResponse
        {
            Success = response.Success,
            ResponseData = response.ResponseData,
            ErrorCode = response.ErrorCode,
            ErrorMessage = response.ErrorMessage,
            Duration = response.ProcessingTime
        };
    }
    catch (RpcException ex)
    {
        _logger.LogWarning(ex, "gRPC调用失败: {WorkerNode}", workerNodeId);
        
        // 7. 处理gRPC异常
        return new RouteMessageResponse
        {
            Success = false,
            ErrorCode = MapGrpcStatusToErrorCode(ex.StatusCode),
            ErrorMessage = ex.Status.Detail
        };
    }
}

1.3 服务网关时序图

sequenceDiagram
    participant Client as 客户端
    participant Gateway as 网关服务
    participant Registry as 代理注册中心
    participant Router as 消息路由器
    participant Worker as 工作节点
    participant Agent as 目标代理
    
    %% 代理注册流程
    Worker->>Gateway: RegisterAgent(agentInfo)
    Gateway->>Registry: RegisterAgentAsync(registration)
    Registry->>Registry: 持久化到etcd/Redis
    Registry->>Registry: 更新索引和缓存
    Registry-->>Gateway: 注册成功
    Gateway-->>Worker: RegisterAgentResponse
    
    %% 消息路由流程
    Client->>Gateway: SendMessage(targetAgent, message)
    Gateway->>Registry: GetAgentAsync(targetAgentId)
    Registry-->>Gateway: AgentRegistration
    Gateway->>Router: RouteMessageAsync(request)
    Router->>Router: 确定路由策略
    
    alt 本地代理
        Router->>Agent: 直接调用
        Agent-->>Router: 响应
    else 远程代理
        Router->>Worker: gRPC转发
        Worker->>Agent: 本地调用
        Agent-->>Worker: 响应
        Worker-->>Router: gRPC响应
    end
    
    Router-->>Gateway: RouteMessageResponse
    Gateway-->>Client: 最终响应

2. 工作节点管理

2.1 工作节点架构

classDiagram
    class WorkerNode {
        +NodeId: string
        +Endpoint: string
        +Status: NodeStatus
        +Capabilities: List~string~
        +RegisteredAgents: List~AgentId~
        +StartAsync()
        +StopAsync()
        +RegisterAgent(agent)
        +UnregisterAgent(agentId)
    }
    
    class NodeManager {
        +RegisterNodeAsync(node)
        +UnregisterNodeAsync(nodeId)
        +GetNodeAsync(nodeId)
        +ListNodesAsync()
        +MonitorNodeHealth()
    }
    
    class HealthChecker {
        +StartHealthCheck(nodeId)
        +StopHealthCheck(nodeId)
        +CheckNodeHealth(nodeId)
        +HandleHealthCheckFailure(nodeId)
    }
    
    class LoadBalancer {
        +SelectNode(criteria)
        +UpdateNodeWeights()
        +GetNodeMetrics()
    }
    
    WorkerNode --> NodeManager : managed by
    NodeManager --> HealthChecker : uses
    NodeManager --> LoadBalancer : uses

2.2 工作节点核心实现

1. WorkerNode - 工作节点实现

public class WorkerNode : BackgroundService
{
    /*
     * 工作节点的核心实现
     * 
     * 功能说明:
     * 1. 管理本地代理的生命周期
     * 2. 与网关服务保持连接和心跳
     * 3. 处理来自网关的消息转发请求
     * 4. 监控本地资源使用情况
     * 5. 实现优雅的启动和关闭
     * 
     * 设计特点:
     * - 自治性:能够独立运行和管理本地代理
     * - 弹性:支持网络中断和恢复
     * - 可观测性:提供丰富的监控和诊断信息
     * - 可扩展性:支持动态添加和移除代理
     */
    
    private readonly IAgentRuntime _agentRuntime;
    private readonly IGatewayClient _gatewayClient;
    private readonly IConfiguration _configuration;
    private readonly ILogger<WorkerNode> _logger;
    private readonly ConcurrentDictionary<AgentId, IAgent> _localAgents;
    private readonly Timer _heartbeatTimer;
    private readonly Timer _metricsTimer;
    private readonly CancellationTokenSource _shutdownTokenSource;
    
    public string NodeId { get; }
    public string Endpoint { get; }
    public NodeStatus Status { get; private set; }
    public NodeMetrics Metrics { get; private set; }
    
    public WorkerNode(
        IAgentRuntime agentRuntime,
        IGatewayClient gatewayClient,
        IConfiguration configuration,
        ILogger<WorkerNode> logger)
    {
        _agentRuntime = agentRuntime;
        _gatewayClient = gatewayClient;
        _configuration = configuration;
        _logger = logger;
        
        NodeId = Environment.MachineName + "_" + Guid.NewGuid().ToString("N")[..8];
        Endpoint = $"http://{Environment.MachineName}:{_configuration.GetValue<int>("Port")}";
        Status = NodeStatus.Initializing;
        Metrics = new NodeMetrics();
        
        _localAgents = new ConcurrentDictionary<AgentId, IAgent>();
        _shutdownTokenSource = new CancellationTokenSource();
        
        // 设置心跳定时器
        var heartbeatInterval = _configuration.GetValue<int>("HeartbeatInterval", 30000);
        _heartbeatTimer = new Timer(SendHeartbeat, null, heartbeatInterval, heartbeatInterval);
        
        // 设置指标收集定时器
        var metricsInterval = _configuration.GetValue<int>("MetricsInterval", 60000);
        _metricsTimer = new Timer(CollectMetrics, null, metricsInterval, metricsInterval);
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        /*
         * 工作节点的主执行循环
         * 
         * 执行流程:
         * 1. 初始化本地运行时环境
         * 2. 注册到网关服务
         * 3. 启动gRPC服务监听
         * 4. 处理消息转发请求
         * 5. 监控本地代理状态
         */
        
        try
        {
            // 1. 初始化运行时环境
            await InitializeRuntimeAsync(stoppingToken);
            
            // 2. 注册到网关服务
            await RegisterToGatewayAsync(stoppingToken);
            
            // 3. 启动gRPC服务
            await StartGrpcServiceAsync(stoppingToken);
            
            // 4. 更新状态为运行中
            Status = NodeStatus.Running;
            _logger.LogInformation("工作节点启动完成: {NodeId}", NodeId);
            
            // 5. 主循环:处理消息和维护连接
            await RunMainLoopAsync(stoppingToken);
        }
        catch (OperationCanceledException)
        {
            _logger.LogInformation("工作节点正在关闭: {NodeId}", NodeId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "工作节点运行异常: {NodeId}", NodeId);
            Status = NodeStatus.Failed;
        }
        finally
        {
            // 6. 清理资源
            await CleanupAsync();
        }
    }
    
    public async Task<RegisterAgentResult> RegisterAgentAsync<TAgent>(
        string agentKey = null,
        Dictionary<string, string> metadata = null)
        where TAgent : class, IAgent
    {
        /*
         * 注册本地代理到工作节点
         * 
         * 功能说明:
         * 1. 创建代理实例
         * 2. 注册到本地运行时
         * 3. 向网关服务注册
         * 4. 配置代理路由规则
         */
        
        var agentType = typeof(TAgent).Name;
        var agentId = new AgentId
        {
            Type = agentType,
            Key = agentKey ?? Guid.NewGuid().ToString(),
            WorkerNodeId = NodeId
        };
        
        try
        {
            // 1. 创建代理实例
            var agent = _agentRuntime.CreateAgent<TAgent>(agentId);
            
            // 2. 初始化代理
            if (agent is IAsyncInitializable asyncInit)
            {
                await asyncInit.InitializeAsync(_shutdownTokenSource.Token);
            }
            
            // 3. 注册到本地运行时
            await _agentRuntime.RegisterAgentAsync(agent);
            
            // 4. 添加到本地代理集合
            _localAgents.TryAdd(agentId, agent);
            
            // 5. 向网关服务注册
            var registrationRequest = new RegisterAgentRequest
            {
                AgentType = agentType,
                AgentKey = agentId.Key,
                WorkerNodeId = NodeId,
                Capabilities = { GetAgentCapabilities<TAgent>() },
                Metadata = { metadata ?? new Dictionary<string, string>() },
                Version = GetAgentVersion<TAgent>(),
                HealthCheckConfig = GetHealthCheckConfig<TAgent>()
            };
            
            var registrationResponse = await _gatewayClient.RegisterAgentAsync(registrationRequest);
            
            if (!registrationResponse.Success)
            {
                // 注册失败,清理本地状态
                _localAgents.TryRemove(agentId, out _);
                await _agentRuntime.UnregisterAgentAsync(agentId);
                
                throw new AgentRegistrationException(
                    $"代理注册失败: {registrationResponse.ErrorMessage}");
            }
            
            _logger.LogInformation("代理注册成功: {AgentId}", agentId);
            
            return new RegisterAgentResult
            {
                Success = true,
                AgentId = agentId,
                GatewayEndpoint = registrationResponse.GatewayEndpoint
            };
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "代理注册失败: {AgentType}", agentType);
            throw;
        }
    }
    
    [GrpcMethod]
    public async Task<ForwardMessageResponse> ForwardMessageAsync(
        ForwardMessageRequest request,
        ServerCallContext context)
    {
        /*
         * 处理来自网关的消息转发请求
         * 
         * 功能说明:
         * 1. 解析目标代理标识符
         * 2. 查找本地代理实例
         * 3. 转发消息给目标代理
         * 4. 返回处理结果
         */
        
        using var activity = ActivitySource.StartActivity("WorkerNode.ForwardMessage");
        activity?.SetTag("target.agent", request.TargetAgentId);
        activity?.SetTag("message.type", request.MessageType);
        
        var stopwatch = Stopwatch.StartNew();
        
        try
        {
            // 1. 解析代理ID
            if (!AgentId.TryParse(request.TargetAgentId, out var targetAgentId))
            {
                return new ForwardMessageResponse
                {
                    Success = false,
                    ErrorCode = "INVALID_AGENT_ID",
                    ErrorMessage = "无效的代理标识符"
                };
            }
            
            // 2. 查找本地代理
            if (!_localAgents.TryGetValue(targetAgentId, out var targetAgent))
            {
                return new ForwardMessageResponse
                {
                    Success = false,
                    ErrorCode = "AGENT_NOT_FOUND",
                    ErrorMessage = $"本地未找到代理: {targetAgentId}"
                };
            }
            
            // 3. 反序列化消息
            var messageType = Type.GetType(request.MessageType);
            if (messageType == null)
            {
                return new ForwardMessageResponse
                {
                    Success = false,
                    ErrorCode = "UNKNOWN_MESSAGE_TYPE",
                    ErrorMessage = $"未知的消息类型: {request.MessageType}"
                };
            }
            
            var message = JsonSerializer.Deserialize(request.MessageData, messageType);
            
            // 4. 创建消息上下文
            var messageContext = new MessageContext
            {
                MessageId = request.CorrelationId,
                Sender = new AgentId("gateway", "system"),
                Recipient = targetAgentId,
                CancellationToken = context.CancellationToken,
                Properties = request.Metadata.ToDictionary(kv => kv.Key, kv => (object)kv.Value)
            };
            
            // 5. 转发消息给目标代理
            var response = await targetAgent.HandleMessageAsync(message, messageContext);
            
            // 6. 序列化响应
            var responseData = JsonSerializer.Serialize(response);
            
            stopwatch.Stop();
            
            // 7. 更新指标
            Metrics.MessagesProcessed++;
            Metrics.AverageProcessingTime = 
                (Metrics.AverageProcessingTime + stopwatch.ElapsedMilliseconds) / 2;
            
            return new ForwardMessageResponse
            {
                Success = true,
                ResponseData = responseData,
                ProcessingTime = stopwatch.ElapsedMilliseconds
            };
        }
        catch (Exception ex)
        {
            stopwatch.Stop();
            
            _logger.LogError(ex, "消息转发失败: {TargetAgent}", request.TargetAgentId);
            
            Metrics.MessagesFailed++;
            
            return new ForwardMessageResponse
            {
                Success = false,
                ErrorCode = "PROCESSING_ERROR",
                ErrorMessage = ex.Message,
                ProcessingTime = stopwatch.ElapsedMilliseconds
            };
        }
    }
    
    private async void SendHeartbeat(object state)
    {
        /*
         * 发送心跳到网关服务
         * 
         * 心跳信息包括:
         * 1. 节点状态和健康信息
         * 2. 本地代理列表和状态
         * 3. 资源使用情况
         * 4. 性能指标
         */
        
        try
        {
            var heartbeatRequest = new HeartbeatRequest
            {
                NodeId = NodeId,
                Status = Status.ToString(),
                Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
                Metrics = new NodeMetricsProto
                {
                    CpuUsage = Metrics.CpuUsage,
                    MemoryUsage = Metrics.MemoryUsage,
                    MessagesProcessed = Metrics.MessagesProcessed,
                    MessagesFailed = Metrics.MessagesFailed,
                    AverageProcessingTime = Metrics.AverageProcessingTime
                },
                Agents = { _localAgents.Keys.Select(id => id.ToString()) }
            };
            
            var response = await _gatewayClient.SendHeartbeatAsync(heartbeatRequest);
            
            if (!response.Success)
            {
                _logger.LogWarning("心跳发送失败: {Error}", response.ErrorMessage);
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "心跳发送异常");
        }
    }
    
    private void CollectMetrics(object state)
    {
        /*
         * 收集节点性能指标
         * 
         * 指标类型:
         * 1. 系统资源:CPU、内存、磁盘、网络
         * 2. 应用指标:消息处理量、响应时间、错误率
         * 3. 代理指标:代理数量、状态分布
         */
        
        try
        {
            // 1. 收集系统资源指标
            var process = Process.GetCurrentProcess();
            
            Metrics.CpuUsage = GetCpuUsage();
            Metrics.MemoryUsage = process.WorkingSet64;
            Metrics.ThreadCount = process.Threads.Count;
            
            // 2. 收集代理指标
            Metrics.ActiveAgents = _localAgents.Count;
            Metrics.OnlineAgents = _localAgents.Values.Count(a => IsAgentOnline(a));
            
            // 3. 收集网络指标
            Metrics.NetworkConnections = GetNetworkConnections();
            
            _logger.LogDebug("指标收集完成: CPU={CpuUsage}%, Memory={MemoryUsage}MB, Agents={AgentCount}",
                Metrics.CpuUsage, Metrics.MemoryUsage / 1024 / 1024, Metrics.ActiveAgents);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "指标收集失败");
        }
    }
}

3. 负载均衡和故障恢复

3.1 负载均衡策略

graph TD
    subgraph "负载均衡策略"
        LB[LoadBalancer]
        
        subgraph "策略类型"
            RR[轮询 Round Robin]
            WRR[加权轮询 Weighted Round Robin]
            LC[最少连接 Least Connections]
            RT[响应时间 Response Time]
            HASH[一致性哈希 Consistent Hash]
        end
        
        subgraph "健康检查"
            HC[HealthChecker]
            HM[HealthMonitor]
            FB[Failback机制]
        end
        
        subgraph "指标收集"
            MC[MetricsCollector]
            PM[PerformanceMonitor]
            AM[AlertManager]
        end
    end
    
    LB --> RR
    LB --> WRR
    LB --> LC
    LB --> RT
    LB --> HASH
    
    LB --> HC
    HC --> HM
    HC --> FB
    
    LB --> MC
    MC --> PM
    MC --> AM

3.2 负载均衡核心实现

1. LoadBalancer - 负载均衡器

public class LoadBalancer : ILoadBalancer
{
    /*
     * 负载均衡器的核心实现
     * 
     * 功能说明:
     * 1. 实现多种负载均衡算法
     * 2. 集成健康检查机制
     * 3. 支持动态权重调整
     * 4. 提供故障转移能力
     * 
     * 算法特点:
     * - 轮询:简单公平,适合同质化节点
     * - 加权轮询:考虑节点能力差异
     * - 最少连接:适合长连接场景
     * - 响应时间:优化用户体验
     * - 一致性哈希:保持会话亲和性
     */
    
    private readonly IAgentRegistry _agentRegistry;
    private readonly IHealthChecker _healthChecker;
    private readonly IMetricsCollector _metricsCollector;
    private readonly ILogger<LoadBalancer> _logger;
    private readonly ConcurrentDictionary<string, LoadBalancingState> _balancingStates;
    
    public LoadBalancer(
        IAgentRegistry agentRegistry,
        IHealthChecker healthChecker,
        IMetricsCollector metricsCollector,
        ILogger<LoadBalancer> logger)
    {
        _agentRegistry = agentRegistry;
        _healthChecker = healthChecker;
        _metricsCollector = metricsCollector;
        _logger = logger;
        _balancingStates = new ConcurrentDictionary<string, LoadBalancingState>();
    }
    
    public async Task<AgentRegistration> SelectAgentAsync(
        string agentType,
        LoadBalancingStrategy strategy = LoadBalancingStrategy.RoundRobin,
        Dictionary<string, object> criteria = null)
    {
        /*
         * 选择最优代理的核心方法
         * 
         * 选择流程:
         * 1. 获取候选代理列表
         * 2. 过滤不健康的代理
         * 3. 应用负载均衡算法
         * 4. 更新选择统计信息
         */
        
        try
        {
            // 1. 获取指定类型的所有代理
            var candidates = await _agentRegistry.GetAgentsByTypeAsync(agentType);
            
            if (!candidates.Any())
            {
                _logger.LogWarning("未找到类型为 {AgentType} 的代理", agentType);
                return null;
            }
            
            // 2. 过滤健康的代理
            var healthyAgents = new List<AgentRegistration>();
            
            foreach (var agent in candidates)
            {
                var healthStatus = await _healthChecker.CheckAgentHealthAsync(agent.AgentId);
                if (healthStatus.IsHealthy)
                {
                    healthyAgents.Add(agent);
                }
            }
            
            if (!healthyAgents.Any())
            {
                _logger.LogWarning("类型为 {AgentType} 的代理都不健康", agentType);
                return null;
            }
            
            // 3. 应用负载均衡策略
            var selectedAgent = strategy switch
            {
                LoadBalancingStrategy.RoundRobin => SelectByRoundRobin(agentType, healthyAgents),
                LoadBalancingStrategy.WeightedRoundRobin => await SelectByWeightedRoundRobin(agentType, healthyAgents),
                LoadBalancingStrategy.LeastConnections => await SelectByLeastConnections(healthyAgents),
                LoadBalancingStrategy.ResponseTime => await SelectByResponseTime(healthyAgents),
                LoadBalancingStrategy.ConsistentHash => SelectByConsistentHash(healthyAgents, criteria),
                LoadBalancingStrategy.Random => SelectByRandom(healthyAgents),
                _ => throw new NotSupportedException($"不支持的负载均衡策略: {strategy}")
            };
            
            // 4. 更新选择统计
            if (selectedAgent != null)
            {
                await UpdateSelectionStats(selectedAgent, strategy);
            }
            
            return selectedAgent;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "代理选择失败: {AgentType}", agentType);
            return null;
        }
    }
    
    private AgentRegistration SelectByRoundRobin(
        string agentType, 
        List<AgentRegistration> agents)
    {
        /*
         * 轮询算法实现
         * 
         * 算法特点:
         * 1. 简单公平:每个代理轮流处理请求
         * 2. 无状态:不需要考虑代理的当前负载
         * 3. 适用场景:同质化代理,请求处理时间相近
         */
        
        var state = _balancingStates.GetOrAdd(agentType, 
            _ => new LoadBalancingState { CurrentIndex = 0 });
        
        lock (state)
        {
            var index = state.CurrentIndex % agents.Count;
            state.CurrentIndex = (state.CurrentIndex + 1) % agents.Count;
            
            return agents[index];
        }
    }
    
    private async Task<AgentRegistration> SelectByWeightedRoundRobin(
        string agentType,
        List<AgentRegistration> agents)
    {
        /*
         * 加权轮询算法实现
         * 
         * 算法特点:
         * 1. 考虑代理能力:性能强的代理处理更多请求
         * 2. 动态权重:根据实时性能调整权重
         * 3. 平滑分配:避免突发流量集中到某个代理
         */
        
        var state = _balancingStates.GetOrAdd(agentType,
            _ => new LoadBalancingState { WeightedAgents = new List<WeightedAgent>() });
        
        lock (state)
        {
            // 1. 初始化或更新权重信息
            if (state.WeightedAgents.Count != agents.Count ||
                DateTime.UtcNow - state.LastWeightUpdate > TimeSpan.FromMinutes(5))
            {
                await UpdateAgentWeights(state, agents);
            }
            
            // 2. 计算当前权重
            var totalWeight = state.WeightedAgents.Sum(wa => wa.CurrentWeight);
            if (totalWeight <= 0)
            {
                // 所有权重为0,回退到轮询
                return SelectByRoundRobin(agentType, agents);
            }
            
            // 3. 选择权重最高的代理
            var selectedWeightedAgent = state.WeightedAgents
                .OrderByDescending(wa => wa.CurrentWeight)
                .First();
            
            // 4. 更新权重
            selectedWeightedAgent.CurrentWeight -= totalWeight;
            foreach (var wa in state.WeightedAgents)
            {
                wa.CurrentWeight += wa.Weight;
            }
            
            return selectedWeightedAgent.Agent;
        }
    }
    
    private async Task<AgentRegistration> SelectByLeastConnections(
        List<AgentRegistration> agents)
    {
        /*
         * 最少连接算法实现
         * 
         * 算法特点:
         * 1. 负载感知:选择当前连接数最少的代理
         * 2. 动态平衡:自动平衡各代理的负载
         * 3. 适用场景:长连接、处理时间差异大的场景
         */
        
        var agentConnections = new List<(AgentRegistration Agent, int Connections)>();
        
        // 1. 获取每个代理的当前连接数
        foreach (var agent in agents)
        {
            var metrics = await _metricsCollector.GetAgentMetricsAsync(agent.AgentId);
            var connections = metrics?.ActiveConnections ?? 0;
            
            agentConnections.Add((agent, connections));
        }
        
        // 2. 选择连接数最少的代理
        var selectedAgent = agentConnections
            .OrderBy(ac => ac.Connections)
            .ThenBy(ac => Guid.NewGuid()) // 相同连接数时随机选择
            .First();
        
        return selectedAgent.Agent;
    }
    
    private async Task<AgentRegistration> SelectByResponseTime(
        List<AgentRegistration> agents)
    {
        /*
         * 响应时间算法实现
         * 
         * 算法特点:
         * 1. 性能优先:选择响应时间最短的代理
         * 2. 用户体验:优化整体响应时间
         * 3. 自适应:根据历史性能动态选择
         */
        
        var agentResponseTimes = new List<(AgentRegistration Agent, double ResponseTime)>();
        
        // 1. 获取每个代理的平均响应时间
        foreach (var agent in agents)
        {
            var metrics = await _metricsCollector.GetAgentMetricsAsync(agent.AgentId);
            var responseTime = metrics?.AverageResponseTime ?? double.MaxValue;
            
            agentResponseTimes.Add((agent, responseTime));
        }
        
        // 2. 选择响应时间最短的代理
        var selectedAgent = agentResponseTimes
            .OrderBy(art => art.ResponseTime)
            .ThenBy(art => Guid.NewGuid()) // 相同响应时间时随机选择
            .First();
        
        return selectedAgent.Agent;
    }
    
    private AgentRegistration SelectByConsistentHash(
        List<AgentRegistration> agents,
        Dictionary<string, object> criteria)
    {
        /*
         * 一致性哈希算法实现
         * 
         * 算法特点:
         * 1. 会话亲和:相同的请求总是路由到同一个代理
         * 2. 扩展性好:添加或移除代理时影响最小
         * 3. 适用场景:需要保持状态的应用
         */
        
        // 1. 提取哈希键
        var hashKey = ExtractHashKey(criteria);
        if (string.IsNullOrEmpty(hashKey))
        {
            // 无法提取哈希键,回退到轮询
            return SelectByRoundRobin("", agents);
        }
        
        // 2. 计算哈希值
        var hash = ComputeHash(hashKey);
        
        // 3. 选择对应的代理
        var index = (int)(hash % (uint)agents.Count);
        return agents[index];
    }
    
    private async Task UpdateAgentWeights(
        LoadBalancingState state,
        List<AgentRegistration> agents)
    {
        /*
         * 更新代理权重的实现
         * 
         * 权重计算因素:
         * 1. 代理性能:CPU、内存使用率
         * 2. 处理能力:历史处理速度
         * 3. 健康状态:错误率、响应时间
         * 4. 配置权重:管理员设置的静态权重
         */
        
        state.WeightedAgents.Clear();
        
        foreach (var agent in agents)
        {
            var metrics = await _metricsCollector.GetAgentMetricsAsync(agent.AgentId);
            
            // 1. 基础权重(配置)
            var baseWeight = agent.Metadata.TryGetValue("weight", out var weightStr) &&
                            int.TryParse(weightStr, out var weight) ? weight : 100;
            
            // 2. 性能权重调整
            var performanceMultiplier = CalculatePerformanceMultiplier(metrics);
            
            // 3. 健康权重调整
            var healthMultiplier = CalculateHealthMultiplier(metrics);
            
            // 4. 计算最终权重
            var finalWeight = (int)(baseWeight * performanceMultiplier * healthMultiplier);
            finalWeight = Math.Max(1, finalWeight); // 确保权重至少为1
            
            state.WeightedAgents.Add(new WeightedAgent
            {
                Agent = agent,
                Weight = finalWeight,
                CurrentWeight = finalWeight
            });
        }
        
        state.LastWeightUpdate = DateTime.UtcNow;
    }
    
    private double CalculatePerformanceMultiplier(AgentMetrics metrics)
    {
        /*
         * 计算性能权重乘数
         * 
         * 计算公式:
         * 性能乘数 = (1 - CPU使用率) * (1 - 内存使用率) * 响应时间因子
         */
        
        if (metrics == null) return 0.5; // 无指标时使用中等权重
        
        var cpuFactor = Math.Max(0.1, 1.0 - metrics.CpuUsage / 100.0);
        var memoryFactor = Math.Max(0.1, 1.0 - metrics.MemoryUsage / 100.0);
        var responseTimeFactor = Math.Max(0.1, 1.0 / (1.0 + metrics.AverageResponseTime / 1000.0));
        
        return cpuFactor * memoryFactor * responseTimeFactor;
    }
    
    private double CalculateHealthMultiplier(AgentMetrics metrics)
    {
        /*
         * 计算健康权重乘数
         * 
         * 计算公式:
         * 健康乘数 = (1 - 错误率) * 可用性因子
         */
        
        if (metrics == null) return 0.5;
        
        var errorRate = metrics.TotalRequests > 0 
            ? (double)metrics.FailedRequests / metrics.TotalRequests 
            : 0.0;
        
        var errorFactor = Math.Max(0.1, 1.0 - errorRate);
        var availabilityFactor = metrics.IsAvailable ? 1.0 : 0.1;
        
        return errorFactor * availabilityFactor;
    }
}

4. 监控和可观测性

4.1 监控架构

graph TB
    subgraph "监控和可观测性架构"
        subgraph "指标收集 (Metrics Collection)"
            MC[MetricsCollector - 指标收集器]
            PM[PrometheusMetrics - Prometheus指标]
            CM[CustomMetrics - 自定义指标]
        end
        
        subgraph "链路追踪 (Distributed Tracing)"
            AT[ActivitySource - 活动源]
            JT[JaegerTracing - Jaeger追踪]
            OT[OpenTelemetry - 开放遥测]
        end
        
        subgraph "日志聚合 (Log Aggregation)"
            SL[StructuredLogging - 结构化日志]
            LA[LogAggregator - 日志聚合器]
            LS[LogShipping - 日志传输]
        end
        
        subgraph "健康检查 (Health Checks)"
            HC[HealthChecker - 健康检查器]
            HM[HealthMonitor - 健康监控]
            AM[AlertManager - 告警管理]
        end
        
        subgraph "可视化 (Visualization)"
            GD[Grafana Dashboard - Grafana仪表板]
            KD[Kibana Dashboard - Kibana仪表板]
            CD[Custom Dashboard - 自定义仪表板]
        end
    end
    
    MC --> PM
    MC --> CM
    
    AT --> JT
    AT --> OT
    
    SL --> LA
    LA --> LS
    
    HC --> HM
    HM --> AM
    
    PM --> GD
    LA --> KD
    AM --> CD

4.2 监控核心实现

1. MetricsCollector - 指标收集器

public class MetricsCollector : IMetricsCollector, IHostedService
{
    /*
     * 指标收集器的核心实现
     * 
     * 功能说明:
     * 1. 收集系统和应用指标
     * 2. 支持多种指标后端
     * 3. 提供实时和历史指标查询
     * 4. 实现指标的聚合和计算
     * 
     * 指标类型:
     * - 计数器:累计值,如请求总数
     * - 仪表:瞬时值,如CPU使用率
     * - 直方图:分布统计,如响应时间分布
     * - 摘要:分位数统计,如P95响应时间
     */
    
    private readonly ILogger<MetricsCollector> _logger;
    private readonly IConfiguration _configuration;
    private readonly Timer _collectionTimer;
    private readonly ConcurrentDictionary<string, MetricValue> _metrics;
    private readonly List<IMetricsExporter> _exporters;
    
    // Prometheus指标
    private readonly Counter _requestsTotal;
    private readonly Counter _requestsFailedTotal;
    private readonly Histogram _requestDuration;
    private readonly Gauge _activeConnections;
    private readonly Gauge _agentCount;
    
    public MetricsCollector(
        ILogger<MetricsCollector> logger,
        IConfiguration configuration,
        IEnumerable<IMetricsExporter> exporters)
    {
        _logger = logger;
        _configuration = configuration;
        _metrics = new ConcurrentDictionary<string, MetricValue>();
        _exporters = exporters.ToList();
        
        // 初始化Prometheus指标
        _requestsTotal = Metrics.CreateCounter(
            "autogen_requests_total",
            "Total number of requests processed",
            new[] { "agent_type", "status" });
        
        _requestsFailedTotal = Metrics.CreateCounter(
            "autogen_requests_failed_total",
            "Total number of failed requests",
            new[] { "agent_type", "error_type" });
        
        _requestDuration = Metrics.CreateHistogram(
            "autogen_request_duration_seconds",
            "Request processing duration in seconds",
            new[] { "agent_type", "method" });
        
        _activeConnections = Metrics.CreateGauge(
            "autogen_active_connections",
            "Number of active connections",
            new[] { "node_id" });
        
        _agentCount = Metrics.CreateGauge(
            "autogen_agents_count",
            "Number of registered agents",
            new[] { "agent_type", "status" });
        
        // 设置收集定时器
        var interval = _configuration.GetValue<int>("Metrics:CollectionInterval", 30000);
        _collectionTimer = new Timer(CollectMetrics, null, interval, interval);
    }
    
    public void RecordRequest(string agentType, string status, double duration)
    {
        /*
         * 记录请求指标
         * 
         * 指标更新:
         * 1. 增加请求总数计数器
         * 2. 记录请求处理时间
         * 3. 更新失败计数(如果失败)
         * 4. 计算平均响应时间
         */
        
        // 1. 更新Prometheus指标
        _requestsTotal.WithLabels(agentType, status).Inc();
        _requestDuration.WithLabels(agentType, "process").Observe(duration / 1000.0);
        
        if (status == "failed")
        {
            _requestsFailedTotal.WithLabels(agentType, "unknown").Inc();
        }
        
        // 2. 更新内部指标
        var key = $"requests.{agentType}.{status}";
        _metrics.AddOrUpdate(key,
            new MetricValue { Value = 1, Timestamp = DateTimeOffset.UtcNow },
            (k, v) => new MetricValue { Value = v.Value + 1, Timestamp = DateTimeOffset.UtcNow });
        
        // 3. 更新响应时间指标
        var durationKey = $"duration.{agentType}";
        _metrics.AddOrUpdate(durationKey,
            new MetricValue { Value = duration, Timestamp = DateTimeOffset.UtcNow },
            (k, v) => new MetricValue 
            { 
                Value = (v.Value + duration) / 2, // 简单移动平均
                Timestamp = DateTimeOffset.UtcNow 
            });
    }
    
    public void RecordAgentMetrics(AgentId agentId, AgentMetrics metrics)
    {
        /*
         * 记录代理指标
         * 
         * 指标类型:
         * 1. 性能指标:CPU、内存使用率
         * 2. 业务指标:处理消息数、错误率
         * 3. 连接指标:活跃连接数
         * 4. 健康指标:健康状态、最后心跳时间
         */
        
        var agentType = agentId.Type;
        var nodeId = agentId.WorkerNodeId;
        
        // 1. 更新代理计数
        _agentCount.WithLabels(agentType, metrics.Status.ToString()).Set(1);
        
        // 2. 更新连接数
        _activeConnections.WithLabels(nodeId).Set(metrics.ActiveConnections);
        
        // 3. 记录详细指标
        var metricsPrefix = $"agent.{agentId}";
        
        _metrics[$"{metricsPrefix}.cpu_usage"] = new MetricValue
        {
            Value = metrics.CpuUsage,
            Timestamp = DateTimeOffset.UtcNow
        };
        
        _metrics[$"{metricsPrefix}.memory_usage"] = new MetricValue
        {
            Value = metrics.MemoryUsage,
            Timestamp = DateTimeOffset.UtcNow
        };
        
        _metrics[$"{metricsPrefix}.messages_processed"] = new MetricValue
        {
            Value = metrics.MessagesProcessed,
            Timestamp = DateTimeOffset.UtcNow
        };
        
        _metrics[$"{metricsPrefix}.messages_failed"] = new MetricValue
        {
            Value = metrics.MessagesFailed,
            Timestamp = DateTimeOffset.UtcNow
        };
        
        _metrics[$"{metricsPrefix}.average_response_time"] = new MetricValue
        {
            Value = metrics.AverageResponseTime,
            Timestamp = DateTimeOffset.UtcNow
        };
    }
    
    public async Task<Dictionary<string, object>> GetMetricsAsync(
        string pattern = null,
        DateTimeOffset? since = null)
    {
        /*
         * 获取指标数据
         * 
         * 查询功能:
         * 1. 支持模式匹配过滤
         * 2. 支持时间范围查询
         * 3. 支持聚合计算
         * 4. 支持多种输出格式
         */
        
        var result = new Dictionary<string, object>();
        var sinceTime = since ?? DateTimeOffset.UtcNow.AddHours(-1);
        
        foreach (var kvp in _metrics)
        {
            var key = kvp.Key;
            var value = kvp.Value;
            
            // 1. 应用模式过滤
            if (!string.IsNullOrEmpty(pattern) && !key.Contains(pattern))
                continue;
            
            // 2. 应用时间过滤
            if (value.Timestamp < sinceTime)
                continue;
            
            // 3. 添加到结果
            result[key] = new
            {
                value = value.Value,
                timestamp = value.Timestamp,
                type = DetermineMetricType(key)
            };
        }
        
        // 4. 添加计算指标
        await AddComputedMetrics(result);
        
        return result;
    }
    
    private async void CollectMetrics(object state)
    {
        /*
         * 定期收集系统指标
         * 
         * 收集内容:
         * 1. 系统资源指标
         * 2. 应用性能指标
         * 3. 业务指标
         * 4. 网络指标
         */
        
        try
        {
            // 1. 收集系统资源指标
            await CollectSystemMetrics();
            
            // 2. 收集应用指标
            await CollectApplicationMetrics();
            
            // 3. 收集网络指标
            await CollectNetworkMetrics();
            
            // 4. 导出指标到外部系统
            await ExportMetrics();
            
            _logger.LogDebug("指标收集完成,共收集 {Count} 个指标", _metrics.Count);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "指标收集失败");
        }
    }
    
    private async Task CollectSystemMetrics()
    {
        /*
         * 收集系统资源指标
         * 
         * 指标包括:
         * 1. CPU使用率和负载
         * 2. 内存使用情况
         * 3. 磁盘I/O和空间
         * 4. 网络流量
         */
        
        var process = Process.GetCurrentProcess();
        
        // CPU指标
        var cpuUsage = await GetCpuUsageAsync();
        _metrics["system.cpu.usage"] = new MetricValue
        {
            Value = cpuUsage,
            Timestamp = DateTimeOffset.UtcNow
        };
        
        // 内存指标
        var memoryUsage = process.WorkingSet64;
        var totalMemory = GC.GetTotalMemory(false);
        
        _metrics["system.memory.working_set"] = new MetricValue
        {
            Value = memoryUsage,
            Timestamp = DateTimeOffset.UtcNow
        };
        
        _metrics["system.memory.gc_total"] = new MetricValue
        {
            Value = totalMemory,
            Timestamp = DateTimeOffset.UtcNow
        };
        
        // GC指标
        for (int generation = 0; generation <= GC.MaxGeneration; generation++)
        {
            var gcCount = GC.CollectionCount(generation);
            _metrics[$"system.gc.gen{generation}_collections"] = new MetricValue
            {
                Value = gcCount,
                Timestamp = DateTimeOffset.UtcNow
            };
        }
        
        // 线程指标
        _metrics["system.threads.count"] = new MetricValue
        {
            Value = process.Threads.Count,
            Timestamp = DateTimeOffset.UtcNow
        };
        
        _metrics["system.threads.pool_worker"] = new MetricValue
        {
            Value = ThreadPool.ThreadCount,
            Timestamp = DateTimeOffset.UtcNow
        };
    }
    
    private async Task ExportMetrics()
    {
        /*
         * 导出指标到外部系统
         * 
         * 支持的导出器:
         * 1. Prometheus - 拉取模式
         * 2. InfluxDB - 推送模式
         * 3. Azure Monitor - 云监控
         * 4. 自定义导出器
         */
        
        var exportTasks = _exporters.Select(async exporter =>
        {
            try
            {
                var metricsToExport = _metrics.ToDictionary(
                    kvp => kvp.Key,
                    kvp => (object)kvp.Value);
                
                await exporter.ExportAsync(metricsToExport);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "指标导出失败: {ExporterType}", exporter.GetType().Name);
            }
        });
        
        await Task.WhenAll(exportTasks);
    }
}

这个完整的服务网关模块分析提供了深入的架构解析、关键API实现和监控机制。通过这些内容,开发者可以全面理解AutoGen分布式系统的核心机制和最佳实践。

<function_calls> true