AutoGen-04-协议模块-概览

模块职责

核心职责

AutoGen协议模块定义了跨语言、跨进程的统一通信协议,基于gRPC和CloudEvents标准,实现多代理系统的互操作性。主要职责包括:

  • 统一通信协议:定义Agent Worker Protocol,支持Python和.NET代理互通
  • 消息标准化:基于CloudEvents v1.0规范,确保消息格式一致性
  • 服务发现机制:代理注册、订阅管理、路由规则定义
  • 跨语言互操作:Protocol Buffers序列化,支持多语言客户端
  • 分布式支持:支持代理在不同进程、机器上运行

输入与输出

  • 输入:代理注册请求、消息发送请求、订阅配置、控制指令
  • 输出:路由后的消息、代理状态响应、订阅确认、错误信息

上下游依赖

  • 上游依赖:gRPC.NET/grpcio、Protocol Buffers、CloudEvents规范
  • 下游调用方:分布式代理运行时、跨语言代理实例、服务网关

协议架构图

flowchart TB
    subgraph "客户端层"
        PythonClient[Python代理客户端]
        DotNetClient[.NET代理客户端]
        JavaClient[Java代理客户端]
        JSClient[JavaScript客户端]
    end
    
    subgraph "协议定义层"
        subgraph "agent_worker.proto"
            AgentRpcService[AgentRpc服务定义]
            MessageTypes[消息类型定义<br/>RpcRequest/Response]
            SubscriptionTypes[订阅类型<br/>TypeSubscription]
            ControlTypes[控制消息<br/>SaveState/LoadState]
        end
        
        subgraph "cloudevent.proto"
            CloudEventDef[CloudEvent定义<br/>基于CNCF规范]
            AttributeTypes[属性值类型<br/>string/bytes/timestamp等]
        end
    end
    
    subgraph "网络传输层"
        gRPCChannels[gRPC双向流<br/>HTTP/2 + Protobuf]
        TLSEncryption[TLS 1.3加密传输]
        LoadBalancing[负载均衡<br/>客户端/服务端]
    end
    
    subgraph "消息路由层"
        MessageQueue[消息队列缓冲]
        RoutingEngine[路由引擎<br/>基于订阅规则]
        MessageTransform[消息转换器<br/>格式适配]
    end
    
    subgraph "状态管理层"
        AgentRegistry[代理注册表<br/>类型→实例映射]
        SubscriptionManager[订阅管理器<br/>主题→代理映射]
        StateStore[状态存储<br/>代理状态持久化]
    end
    
    PythonClient --> gRPCChannels
    DotNetClient --> gRPCChannels
    JavaClient --> gRPCChannels
    JSClient --> gRPCChannels
    
    gRPCChannels --> AgentRpcService
    AgentRpcService --> MessageTypes
    AgentRpcService --> CloudEventDef
    
    gRPCChannels --> TLSEncryption
    TLSEncryption --> LoadBalancing
    
    LoadBalancing --> MessageQueue
    MessageQueue --> RoutingEngine
    RoutingEngine --> MessageTransform
    
    RoutingEngine --> AgentRegistry
    RoutingEngine --> SubscriptionManager
    MessageTransform --> StateStore

核心协议定义

Agent Worker Protocol服务

service AgentRpc {
    // 双向消息流通道
    rpc OpenChannel (stream Message) returns (stream Message);
    
    // 控制通道(状态管理)
    rpc OpenControlChannel (stream ControlMessage) returns (stream ControlMessage);
    
    // 代理生命周期管理
    rpc RegisterAgent(RegisterAgentTypeRequest) returns (RegisterAgentTypeResponse);
    rpc AddSubscription(AddSubscriptionRequest) returns (AddSubscriptionResponse);
    rpc RemoveSubscription(RemoveSubscriptionRequest) returns (RemoveSubscriptionResponse);
    rpc GetSubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsResponse);
}

服务方法说明

  • OpenChannel:主要消息传递通道,支持RPC和事件消息
  • OpenControlChannel:控制平面通道,用于状态管理和系统控制
  • RegisterAgent:代理类型注册,建立类型到处理器的映射
  • Add/RemoveSubscription:订阅管理,控制消息路由规则

消息结构定义

message Message {
    oneof message {
        RpcRequest request = 1;
        RpcResponse response = 2;
        io.cloudevents.v1.CloudEvent cloudEvent = 3;
    }
}

message RpcRequest {
    string request_id = 1;                // 请求唯一标识
    optional AgentId source = 2;          // 发送方代理ID(可选)
    AgentId target = 3;                   // 目标代理ID
    string method = 4;                    // 调用方法名
    Payload payload = 5;                  // 消息载荷
    map<string, string> metadata = 6;    // 元数据(追踪、路由等)
}

message RpcResponse {
    string request_id = 1;                // 对应请求ID
    Payload payload = 2;                  // 响应载荷
    string error = 3;                     // 错误信息(空表示成功)
    map<string, string> metadata = 4;    // 响应元数据
}

代理标识与载荷

message AgentId {
    string type = 1;                      // 代理类型名
    string key = 2;                       // 代理实例键
}

message Payload {
    string data_type = 1;                 // 数据类型标识符
    string data_content_type = 2;         // 内容类型(application/json等)
    bytes data = 3;                       // 序列化后的数据
}

设计要点

  • AgentId采用(type, key)二元组,支持同类型多实例
  • Payload支持多种序列化格式,默认JSON,可扩展Protobuf
  • request_id使用UUID v4,确保全局唯一性

订阅机制

message TypeSubscription {
    string topic_type = 1;                // 主题类型匹配
    string agent_type = 2;                // 处理代理类型
}

message TypePrefixSubscription {
    string topic_type_prefix = 1;         // 主题类型前缀匹配  
    string agent_type = 2;                // 处理代理类型
}

message Subscription {
    string id = 1;                        // 订阅唯一ID
    oneof subscription {
        TypeSubscription typeSubscription = 2;
        TypePrefixSubscription typePrefixSubscription = 3;
    }
}

订阅路由规则

  • TypeSubscription:精确类型匹配,支持实例分片
  • TypePrefixSubscription:前缀匹配,支持层级主题

CloudEvents集成

标准兼容性

AutoGen采用CNCF CloudEvents v1.0作为事件消息标准,确保与云原生生态系统的兼容性:

message CloudEvent {
    // 必填属性
    string id = 1;                        // 事件唯一标识
    string source = 2;                    // 事件源URI
    string spec_version = 3;              // CloudEvents规范版本
    string type = 4;                      // 事件类型
    
    // 可选和扩展属性
    map<string, CloudEventAttributeValue> attributes = 5;
    
    // 事件数据(二进制/文本/Protobuf)
    oneof data {
        bytes binary_data = 6;
        string text_data = 7;
        google.protobuf.Any proto_data = 8;
    }
}

事件属性扩展

message CloudEventAttributeValue {
    oneof attr {
        bool ce_boolean = 1;
        int32 ce_integer = 2;
        string ce_string = 3;
        bytes ce_bytes = 4;
        string ce_uri = 5;
        string ce_uri_ref = 6;
        google.protobuf.Timestamp ce_timestamp = 7;
    }
}

AutoGen特定属性

  • agenttype:发送方代理类型
  • agentkey:发送方代理实例键
  • correlation_id:关联ID,用于追踪相关事件
  • retry_count:重试次数
  • ttl:事件生存时间(秒)

事件消息示例

{
  "specversion": "1.0",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "source": "agent://task_processor/instance_1",
  "type": "com.autogen.task.completed",
  "time": "2024-01-15T10:30:00Z",
  "agenttype": "task_processor",
  "agentkey": "instance_1",
  "correlation_id": "batch_job_123",
  "datacontenttype": "application/json",
  "data": {
    "task_id": "task_456",
    "result": "处理完成",
    "processing_time": 1.25,
    "output_files": ["result.txt", "summary.json"]
  }
}

网络与安全

gRPC配置

# gRPC服务配置
grpc_server:
  bind_address: "0.0.0.0:50051"
  max_concurrent_streams: 1000
  max_receive_message_size: 16MB
  max_send_message_size: 16MB
  keepalive_time: 30s
  keepalive_timeout: 5s
  keepalive_permit_without_calls: true
  
# 客户端配置  
grpc_client:
  max_receive_message_size: 16MB
  max_send_message_size: 16MB
  keepalive_time: 30s
  keepalive_timeout: 5s
  initial_reconnect_backoff: 1s
  max_reconnect_backoff: 120s

TLS安全配置

# TLS配置
tls:
  enabled: true
  cert_file: "/etc/certs/server.crt"
  key_file: "/etc/certs/server.key"
  ca_file: "/etc/certs/ca.crt"
  client_auth: "require_and_verify"
  
# 认证配置
auth:
  method: "jwt"  # jwt, mutual_tls, api_key
  jwt_secret: "${JWT_SECRET}"
  token_expiry: "24h"

错误处理

// 错误代码定义
enum ErrorCode {
    UNKNOWN = 0;
    AGENT_NOT_FOUND = 1;
    SUBSCRIPTION_NOT_FOUND = 2;
    MESSAGE_TOO_LARGE = 3;
    RATE_LIMITED = 4;
    AUTH_FAILED = 5;
    INTERNAL_ERROR = 6;
}

// 错误响应格式
message ErrorResponse {
    ErrorCode code = 1;
    string message = 2;
    map<string, string> details = 3;
    string trace_id = 4;
}

性能与可靠性

消息传递保证

  • 至少一次投递:通过确认机制保证消息不丢失
  • 幂等性支持:消息去重,防止重复处理
  • 顺序保证:同一发送方的消息按发送顺序投递
  • 背压控制:基于滑动窗口的流控机制

性能指标

  • 延迟:单向消息传递 < 5ms(本地网络)
  • 吞吐量:单连接 > 10,000 msg/s(小消息)
  • 并发连接:单服务器支持 > 1,000个并发代理
  • 消息大小:默认限制16MB,可配置

监控与追踪

// 遥测数据
message TelemetryData {
    string trace_id = 1;
    string span_id = 2;
    int64 timestamp_ns = 3;
    map<string, string> tags = 4;
    repeated MetricValue metrics = 5;
}

message MetricValue {
    string name = 1;
    double value = 2;
    MetricType type = 3;
}

enum MetricType {
    COUNTER = 0;
    GAUGE = 1;
    HISTOGRAM = 2;
}