概述

本文档深入探讨AutoGen的高级使用模式和设计模式,涵盖复杂的多代理协作模式、企业级架构模式和性能优化模式,为构建大规模、高可靠的多代理系统提供指导。

1. 企业级架构模式

1.1 微服务代理模式 (Microservice Agent Pattern)

graph TB subgraph "微服务代理架构模式" subgraph "业务代理层" UA[用户管理代理] OA[订单处理代理] PA[支付代理] IA[库存代理] NA[通知代理] end subgraph "协调代理层" ORC[业务流程协调器] AGG[数据聚合器] SAGA[分布式事务管理器] end subgraph "基础设施代理层" LOG[日志代理] MON[监控代理] CACHE[缓存代理] AUTH[认证代理] end subgraph "外部集成代理层" EMAIL[邮件服务代理] SMS[短信服务代理] API[第三方API代理] end end ORC --> UA ORC --> OA ORC --> PA ORC --> IA OA --> SAGA PA --> SAGA IA --> SAGA UA --> AUTH OA --> LOG PA --> MON NA --> EMAIL NA --> SMS ORC --> API style ORC fill:#e1f5fe style SAGA fill:#f3e5f5 style AUTH fill:#e8f5e8

实现示例

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class BusinessProcessOrchestrator(RoutedAgent):
    """业务流程协调器 - 实现复杂业务流程的编排"""
    
    def __init__(self):
        super().__init__("业务流程协调器")
        self.saga_manager = SagaManager()
        self.process_cache = TTLCache(maxsize=1000, ttl=3600)
    
    @rpc
    async def orchestrate_order_process(
        self, 
        order_request: OrderRequest, 
        ctx: MessageContext
    ) -> OrderResult:
        """
        编排订单处理流程
        
        实现分布式事务的Saga模式,确保订单处理的一致性
        """
        
        # 创建分布式事务
        saga_id = await self.saga_manager.create_saga("order_process", {
            'order_id': order_request.order_id,
            'user_id': order_request.user_id,
            'items': order_request.items
        })
        
        try:
            # 步骤1: 验证用户
            user_validation = await self.send_message(
                UserValidationRequest(user_id=order_request.user_id),
                AgentId("UserManagementAgent", "default")
            )
            
            if not user_validation.is_valid:
                await self.saga_manager.abort_saga(saga_id, "用户验证失败")
                return OrderResult(success=False, error="用户验证失败")
            
            # 步骤2: 检查库存
            inventory_check = await self.send_message(
                InventoryCheckRequest(items=order_request.items),
                AgentId("InventoryAgent", "default")
            )
            
            if not inventory_check.available:
                await self.saga_manager.abort_saga(saga_id, "库存不足")
                return OrderResult(success=False, error="库存不足")
            
            # 步骤3: 预扣库存
            inventory_reserve = await self.send_message(
                InventoryReserveRequest(
                    items=order_request.items,
                    saga_id=saga_id
                ),
                AgentId("InventoryAgent", "default")
            )
            
            # 步骤4: 处理支付
            payment_result = await self.send_message(
                PaymentRequest(
                    amount=order_request.total_amount,
                    user_id=order_request.user_id,
                    saga_id=saga_id
                ),
                AgentId("PaymentAgent", "default")
            )
            
            if not payment_result.success:
                # 支付失败,回滚库存
                await self.saga_manager.compensate_step(saga_id, "inventory_reserve")
                return OrderResult(success=False, error="支付失败")
            
            # 步骤5: 创建订单
            order_creation = await self.send_message(
                OrderCreationRequest(
                    order_request=order_request,
                    payment_id=payment_result.payment_id,
                    saga_id=saga_id
                ),
                AgentId("OrderAgent", "default")
            )
            
            # 提交事务
            await self.saga_manager.commit_saga(saga_id)
            
            # 发送成功通知
            await self.publish_message(
                OrderCompletedEvent(
                    order_id=order_creation.order_id,
                    user_id=order_request.user_id
                ),
                TopicId("order.completed", "order_service")
            )
            
            return OrderResult(
                success=True,
                order_id=order_creation.order_id,
                saga_id=saga_id
            )
        
        except Exception as e:
            # 发生异常,回滚整个事务
            await self.saga_manager.abort_saga(saga_id, str(e))
            return OrderResult(success=False, error=str(e))

1.2 Event Sourcing模式

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class EventSourcingAgent(RoutedAgent):
    """事件溯源代理 - 实现基于事件的状态管理"""
    
    def __init__(self, name: str):
        super().__init__(f"事件溯源代理: {name}")
        self.event_store = EventStore()
        self.snapshots = SnapshotStore()
        self.current_state = {}
        self.version = 0
    
    async def handle_command(self, command: Command, ctx: MessageContext) -> CommandResult:
        """
        处理命令并生成事件
        
        Args:
            command: 业务命令
            ctx: 消息上下文
            
        Returns:
            CommandResult: 命令处理结果
        """
        
        try:
            # 1. 验证命令
            validation_result = await self._validate_command(command)
            if not validation_result.is_valid:
                return CommandResult(
                    success=False,
                    error=validation_result.error
                )
            
            # 2. 生成领域事件
            events = await self._generate_events_from_command(command)
            
            # 3. 持久化事件
            for event in events:
                await self.event_store.append_event(
                    stream_id=self.id.key,
                    event=event,
                    expected_version=self.version
                )
                self.version += 1
            
            # 4. 应用事件到当前状态
            for event in events:
                await self._apply_event_to_state(event)
            
            # 5. 创建快照 (每100个事件)
            if self.version % 100 == 0:
                await self._create_snapshot()
            
            # 6. 发布事件到其他代理
            for event in events:
                await self.publish_message(
                    event,
                    TopicId(f"domain.{event.event_type}", self.id.key)
                )
            
            return CommandResult(
                success=True,
                events_generated=len(events),
                new_version=self.version
            )
        
        except Exception as e:
            return CommandResult(
                success=False,
                error=str(e)
            )
    
    async def _generate_events_from_command(self, command: Command) -> List[DomainEvent]:
        """从命令生成领域事件"""
        
        events = []
        
        if isinstance(command, CreateUserCommand):
            events.append(UserCreatedEvent(
                user_id=command.user_id,
                name=command.name,
                email=command.email,
                created_at=datetime.utcnow()
            ))
        
        elif isinstance(command, UpdateUserCommand):
            # 检查用户是否存在
            if command.user_id not in self.current_state.get('users', {}):
                raise ValueError(f"用户不存在: {command.user_id}")
            
            events.append(UserUpdatedEvent(
                user_id=command.user_id,
                changes=command.changes,
                updated_at=datetime.utcnow()
            ))
        
        elif isinstance(command, DeleteUserCommand):
            if command.user_id not in self.current_state.get('users', {}):
                raise ValueError(f"用户不存在: {command.user_id}")
            
            events.append(UserDeletedEvent(
                user_id=command.user_id,
                deleted_at=datetime.utcnow()
            ))
        
        return events
    
    async def _apply_event_to_state(self, event: DomainEvent) -> None:
        """将事件应用到当前状态"""
        
        if isinstance(event, UserCreatedEvent):
            if 'users' not in self.current_state:
                self.current_state['users'] = {}
            
            self.current_state['users'][event.user_id] = {
                'name': event.name,
                'email': event.email,
                'created_at': event.created_at,
                'status': 'active'
            }
        
        elif isinstance(event, UserUpdatedEvent):
            if event.user_id in self.current_state.get('users', {}):
                self.current_state['users'][event.user_id].update(event.changes)
        
        elif isinstance(event, UserDeletedEvent):
            if event.user_id in self.current_state.get('users', {}):
                self.current_state['users'][event.user_id]['status'] = 'deleted'
    
    async def rebuild_state_from_events(self, up_to_version: Optional[int] = None) -> None:
        """从事件重建状态"""
        
        # 1. 加载最近的快照
        latest_snapshot = await self.snapshots.get_latest_snapshot(self.id.key)
        if latest_snapshot:
            self.current_state = latest_snapshot.state
            self.version = latest_snapshot.version
            start_version = latest_snapshot.version + 1
        else:
            self.current_state = {}
            self.version = 0
            start_version = 0
        
        # 2. 重放事件
        events = await self.event_store.get_events(
            stream_id=self.id.key,
            from_version=start_version,
            to_version=up_to_version
        )
        
        for event in events:
            await self._apply_event_to_state(event)
            self.version += 1

1.3 CQRS模式 (Command Query Responsibility Segregation)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class CQRSAgent(RoutedAgent):
    """CQRS代理 - 命令查询职责分离"""
    
    def __init__(self, name: str):
        super().__init__(f"CQRS代理: {name}")
        self.command_handlers = {}
        self.query_handlers = {}
        self.write_store = WriteModelStore()
        self.read_store = ReadModelStore()
    
    # 命令处理 (写操作)
    @message_handler
    async def handle_command(self, command: Command, ctx: MessageContext) -> CommandResult:
        """处理命令 - 只负责写操作和状态变更"""
        
        command_type = type(command).__name__
        
        if command_type not in self.command_handlers:
            raise CantHandleException(f"不支持的命令类型: {command_type}")
        
        handler = self.command_handlers[command_type]
        
        # 执行命令处理
        result = await handler(command, ctx)
        
        # 更新写模型
        if result.success:
            await self.write_store.apply_changes(result.changes)
            
            # 发布领域事件
            for event in result.events:
                await self.publish_message(
                    event,
                    TopicId(f"domain.{event.event_type}", self.id.key)
                )
        
        return result
    
    # 查询处理 (读操作)
    @message_handler
    async def handle_query(self, query: Query, ctx: MessageContext) -> QueryResult:
        """处理查询 - 只负责读操作和数据检索"""
        
        query_type = type(query).__name__
        
        if query_type not in self.query_handlers:
            raise CantHandleException(f"不支持的查询类型: {query_type}")
        
        handler = self.query_handlers[query_type]
        
        # 执行查询处理
        result = await handler(query, ctx)
        
        return result
    
    def register_command_handler(self, command_type: type, handler: Callable) -> None:
        """注册命令处理器"""
        self.command_handlers[command_type.__name__] = handler
    
    def register_query_handler(self, query_type: type, handler: Callable) -> None:
        """注册查询处理器"""
        self.query_handlers[query_type.__name__] = handler

# 使用示例
class UserManagementAgent(CQRSAgent):
    """用户管理代理 - CQRS模式实现"""
    
    def __init__(self):
        super().__init__("用户管理")
        
        # 注册命令处理器
        self.register_command_handler(CreateUserCommand, self._handle_create_user)
        self.register_command_handler(UpdateUserCommand, self._handle_update_user)
        
        # 注册查询处理器
        self.register_query_handler(GetUserQuery, self._handle_get_user)
        self.register_query_handler(ListUsersQuery, self._handle_list_users)
    
    async def _handle_create_user(self, command: CreateUserCommand, ctx: MessageContext) -> CommandResult:
        """处理创建用户命令"""
        
        # 业务逻辑验证
        if await self._user_exists(command.email):
            return CommandResult(
                success=False,
                error="用户邮箱已存在"
            )
        
        # 生成事件
        user_created_event = UserCreatedEvent(
            user_id=str(uuid.uuid4()),
            name=command.name,
            email=command.email,
            created_at=datetime.utcnow()
        )
        
        return CommandResult(
            success=True,
            events=[user_created_event],
            changes={'users': {user_created_event.user_id: user_created_event.to_dict()}}
        )
    
    async def _handle_get_user(self, query: GetUserQuery, ctx: MessageContext) -> QueryResult:
        """处理获取用户查询"""
        
        # 从读模型获取数据
        user_data = await self.read_store.get_user(query.user_id)
        
        if not user_data:
            return QueryResult(
                success=False,
                error="用户不存在"
            )
        
        return QueryResult(
            success=True,
            data=user_data
        )

2. 高级协作模式

2.1 层次化决策模式 (Hierarchical Decision Pattern)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class HierarchicalDecisionSystem:
    """层次化决策系统 - 实现多层级的决策流程"""
    
    def __init__(self):
        self.decision_layers = {
            'operational': OperationalDecisionAgent(),
            'tactical': TacticalDecisionAgent(), 
            'strategic': StrategicDecisionAgent()
        }
        self.escalation_rules = EscalationRuleEngine()
    
    async def make_decision(self, decision_request: DecisionRequest) -> DecisionResult:
        """
        执行层次化决策流程
        
        Args:
            decision_request: 决策请求
            
        Returns:
            DecisionResult: 决策结果
        """
        
        current_layer = 'operational'
        decision_context = DecisionContext(request=decision_request)
        
        while current_layer:
            # 当前层级尝试决策
            layer_agent = self.decision_layers[current_layer]
            
            try:
                decision_result = await layer_agent.make_decision(decision_context)
                
                if decision_result.confidence > 0.8:
                    # 高置信度,直接返回决策
                    return decision_result
                
                elif decision_result.confidence > 0.5:
                    # 中等置信度,寻求同级意见
                    peer_opinions = await self._get_peer_opinions(current_layer, decision_context)
                    consensus_result = await self._build_consensus(decision_result, peer_opinions)
                    
                    if consensus_result.confidence > 0.8:
                        return consensus_result
                
                # 低置信度或无共识,升级到上层
                next_layer = await self.escalation_rules.get_next_layer(
                    current_layer, decision_context
                )
                
                if next_layer:
                    decision_context.add_layer_input(current_layer, decision_result)
                    current_layer = next_layer
                else:
                    # 已达最高层,返回最佳努力结果
                    return decision_result
            
            except Exception as e:
                # 当前层决策失败,尝试升级
                decision_context.add_error(current_layer, str(e))
                current_layer = await self.escalation_rules.get_next_layer(
                    current_layer, decision_context
                )
        
        # 所有层级都失败
        return DecisionResult(
            success=False,
            error="所有决策层级都无法处理该请求"
        )

class OperationalDecisionAgent(RoutedAgent):
    """操作层决策代理"""
    
    @rpc
    async def make_decision(self, context: DecisionContext) -> DecisionResult:
        """执行操作层决策"""
        
        request = context.request
        
        # 操作层决策逻辑 - 处理日常操作决策
        if request.decision_type == "resource_allocation":
            return await self._decide_resource_allocation(request)
        elif request.decision_type == "task_prioritization":
            return await self._decide_task_priority(request)
        else:
            return DecisionResult(
                success=False,
                confidence=0.0,
                reason="操作层无法处理此类型决策"
            )
    
    async def _decide_resource_allocation(self, request: DecisionRequest) -> DecisionResult:
        """资源分配决策"""
        
        available_resources = request.context.get('available_resources', {})
        required_resources = request.context.get('required_resources', {})
        
        # 简单的资源分配算法
        allocation_plan = {}
        confidence = 1.0
        
        for resource_type, required_amount in required_resources.items():
            available_amount = available_resources.get(resource_type, 0)
            
            if available_amount >= required_amount:
                allocation_plan[resource_type] = required_amount
            else:
                allocation_plan[resource_type] = available_amount
                confidence *= 0.7  # 降低置信度
        
        return DecisionResult(
            success=True,
            confidence=confidence,
            decision_data=allocation_plan,
            reason="基于可用资源的分配方案"
        )

class TacticalDecisionAgent(RoutedAgent):
    """战术层决策代理"""
    
    @rpc
    async def make_decision(self, context: DecisionContext) -> DecisionResult:
        """执行战术层决策"""
        
        # 战术层决策逻辑 - 处理中期规划和优化决策
        request = context.request
        
        if request.decision_type == "process_optimization":
            return await self._optimize_process(request, context)
        elif request.decision_type == "capacity_planning":
            return await self._plan_capacity(request, context)
        else:
            # 结合下层输入进行决策
            operational_input = context.get_layer_input('operational')
            return await self._make_tactical_decision(request, operational_input)

2.2 管道处理模式 (Pipeline Pattern)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class ProcessingPipeline:
    """处理管道 - 实现可组合的处理流水线"""
    
    def __init__(self, name: str):
        self.name = name
        self.stages = []
        self.error_handlers = {}
        self.metrics_collector = PipelineMetricsCollector()
    
    def add_stage(self, stage: PipelineStage) -> 'ProcessingPipeline':
        """添加处理阶段"""
        self.stages.append(stage)
        return self
    
    def add_error_handler(self, error_type: type, handler: Callable) -> 'ProcessingPipeline':
        """添加错误处理器"""
        self.error_handlers[error_type] = handler
        return self
    
    async def process(self, input_data: Any) -> PipelineResult:
        """执行管道处理"""
        
        pipeline_id = str(uuid.uuid4())
        start_time = time.time()
        
        try:
            current_data = input_data
            stage_results = []
            
            for stage_index, stage in enumerate(self.stages):
                stage_start_time = time.time()
                
                try:
                    # 执行阶段处理
                    stage_result = await stage.process(current_data)
                    
                    # 记录阶段结果
                    stage_execution = StageExecution(
                        stage_name=stage.name,
                        input_data=current_data,
                        output_data=stage_result.output,
                        duration=time.time() - stage_start_time,
                        success=stage_result.success
                    )
                    stage_results.append(stage_execution)
                    
                    if not stage_result.success:
                        # 阶段失败,检查是否有错误处理器
                        if stage_result.error_type in self.error_handlers:
                            handler = self.error_handlers[stage_result.error_type]
                            recovery_result = await handler(stage_result.error, current_data)
                            
                            if recovery_result.should_continue:
                                current_data = recovery_result.recovered_data
                                continue
                        
                        # 无法恢复,管道失败
                        return PipelineResult(
                            pipeline_id=pipeline_id,
                            success=False,
                            error=f"阶段 {stage.name} 失败: {stage_result.error}",
                            stage_results=stage_results,
                            duration=time.time() - start_time
                        )
                    
                    # 阶段成功,更新数据
                    current_data = stage_result.output
                
                except Exception as e:
                    # 未捕获的异常
                    await self.metrics_collector.record_stage_error(
                        pipeline_name=self.name,
                        stage_name=stage.name,
                        error=str(e)
                    )
                    
                    return PipelineResult(
                        pipeline_id=pipeline_id,
                        success=False,
                        error=f"阶段 {stage.name} 异常: {str(e)}",
                        stage_results=stage_results,
                        duration=time.time() - start_time
                    )
            
            # 所有阶段成功完成
            total_duration = time.time() - start_time
            
            await self.metrics_collector.record_pipeline_success(
                pipeline_name=self.name,
                duration=total_duration,
                stages_count=len(self.stages)
            )
            
            return PipelineResult(
                pipeline_id=pipeline_id,
                success=True,
                output=current_data,
                stage_results=stage_results,
                duration=total_duration
            )
        
        except Exception as e:
            return PipelineResult(
                pipeline_id=pipeline_id,
                success=False,
                error=f"管道执行异常: {str(e)}",
                duration=time.time() - start_time
            )

# 管道阶段实现
class DataValidationStage(PipelineStage):
    """数据验证阶段"""
    
    def __init__(self, validation_rules: List[ValidationRule]):
        super().__init__("数据验证")
        self.validation_rules = validation_rules
    
    async def process(self, data: Any) -> StageResult:
        """执行数据验证"""
        
        validation_errors = []
        
        for rule in self.validation_rules:
            if not await rule.validate(data):
                validation_errors.append(rule.error_message)
        
        if validation_errors:
            return StageResult(
                success=False,
                error=f"验证失败: {'; '.join(validation_errors)}",
                error_type=ValidationError
            )
        
        return StageResult(
            success=True,
            output=data  # 验证通过,数据不变
        )

class DataTransformationStage(PipelineStage):
    """数据转换阶段"""
    
    def __init__(self, transformation_rules: List[TransformationRule]):
        super().__init__("数据转换")
        self.transformation_rules = transformation_rules
    
    async def process(self, data: Any) -> StageResult:
        """执行数据转换"""
        
        transformed_data = data
        
        for rule in self.transformation_rules:
            transformed_data = await rule.transform(transformed_data)
        
        return StageResult(
            success=True,
            output=transformed_data
        )

class DataEnrichmentStage(PipelineStage):
    """数据丰富化阶段"""
    
    def __init__(self, enrichment_sources: List[EnrichmentSource]):
        super().__init__("数据丰富化")
        self.enrichment_sources = enrichment_sources
    
    async def process(self, data: Any) -> StageResult:
        """执行数据丰富化"""
        
        enriched_data = dict(data) if isinstance(data, dict) else {'original_data': data}
        
        # 并行从多个源丰富数据
        enrichment_tasks = [
            source.enrich(data) for source in self.enrichment_sources
        ]
        
        enrichment_results = await asyncio.gather(*enrichment_tasks, return_exceptions=True)
        
        for i, result in enumerate(enrichment_results):
            source_name = self.enrichment_sources[i].name
            
            if isinstance(result, Exception):
                enriched_data[f'{source_name}_error'] = str(result)
            else:
                enriched_data[f'{source_name}_data'] = result
        
        return StageResult(
            success=True,
            output=enriched_data
        )

# 构建和使用管道
async def build_data_processing_pipeline():
    """构建数据处理管道"""
    
    pipeline = ProcessingPipeline("数据处理管道") \
        .add_stage(DataValidationStage([
            EmailValidationRule(),
            RequiredFieldsValidationRule(['name', 'email'])
        ])) \
        .add_stage(DataTransformationStage([
            LowercaseEmailRule(),
            TrimWhitespaceRule()
        ])) \
        .add_stage(DataEnrichmentStage([
            UserProfileEnrichmentSource(),
            GeolocationEnrichmentSource()
        ])) \
        .add_error_handler(ValidationError, validation_error_handler) \
        .add_error_handler(TransformationError, transformation_error_handler)
    
    return pipeline

# 管道代理
class PipelineAgent(RoutedAgent):
    """管道处理代理"""
    
    def __init__(self, pipeline: ProcessingPipeline):
        super().__init__(f"管道代理: {pipeline.name}")
        self.pipeline = pipeline
    
    @rpc
    async def process_data(self, data: Any, ctx: MessageContext) -> PipelineResult:
        """通过管道处理数据"""
        return await self.pipeline.process(data)

2.3 智能路由模式 (Smart Routing Pattern)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class SmartRoutingAgent(RoutedAgent):
    """智能路由代理 - 基于机器学习的动态路由"""
    
    def __init__(self):
        super().__init__("智能路由代理")
        self.ml_router = MLRoutingModel()
        self.routing_history = []
        self.agent_performance_tracker = AgentPerformanceTracker()
    
    @message_handler
    async def route_request(self, request: RoutingRequest, ctx: MessageContext) -> RoutingResult:
        """智能路由请求"""
        
        # 1. 提取请求特征
        features = await self._extract_request_features(request)
        
        # 2. 获取候选代理列表
        candidate_agents = await self._get_candidate_agents(request.request_type)
        
        # 3. 为每个候选代理评分
        agent_scores = {}
        for agent_id in candidate_agents:
            score = await self._score_agent_for_request(agent_id, features)
            agent_scores[agent_id] = score
        
        # 4. 选择最佳代理
        best_agent = max(agent_scores.keys(), key=lambda a: agent_scores[a])
        
        # 5. 路由请求
        try:
            routing_start_time = time.time()
            
            result = await self.send_message(
                request.payload,
                best_agent
            )
            
            routing_duration = time.time() - routing_start_time
            
            # 6. 记录路由结果用于学习
            await self._record_routing_outcome(
                request=request,
                selected_agent=best_agent,
                result=result,
                duration=routing_duration,
                success=True
            )
            
            return RoutingResult(
                success=True,
                selected_agent=best_agent,
                result=result,
                confidence=agent_scores[best_agent]
            )
        
        except Exception as e:
            # 路由失败,记录并尝试备选代理
            await self._record_routing_outcome(
                request=request,
                selected_agent=best_agent,
                error=str(e),
                success=False
            )
            
            # 尝试次优代理
            if len(agent_scores) > 1:
                second_best_agent = sorted(
                    agent_scores.keys(), 
                    key=lambda a: agent_scores[a],
                    reverse=True
                )[1]
                
                try:
                    fallback_result = await self.send_message(
                        request.payload,
                        second_best_agent
                    )
                    
                    return RoutingResult(
                        success=True,
                        selected_agent=second_best_agent,
                        result=fallback_result,
                        confidence=agent_scores[second_best_agent],
                        fallback_used=True
                    )
                except Exception as fallback_error:
                    pass
            
            return RoutingResult(
                success=False,
                error=str(e)
            )
    
    async def _score_agent_for_request(self, agent_id: AgentId, features: Dict[str, Any]) -> float:
        """为特定请求给代理评分"""
        
        # 1. 基于历史性能评分
        performance_stats = await self.agent_performance_tracker.get_stats(agent_id)
        performance_score = (
            performance_stats.success_rate * 0.4 +
            (1 - performance_stats.avg_response_time / 10.0) * 0.3 +
            (1 - performance_stats.current_load) * 0.3
        )
        
        # 2. 基于特征匹配评分
        feature_score = await self.ml_router.predict_compatibility(agent_id, features)
        
        # 3. 基于当前状态评分
        current_load = await self._get_agent_current_load(agent_id)
        load_score = max(0, 1 - current_load)
        
        # 4. 综合评分
        final_score = (
            performance_score * 0.5 +
            feature_score * 0.3 +
            load_score * 0.2
        )
        
        return max(0.0, min(1.0, final_score))
    
    async def update_ml_model(self) -> None:
        """更新机器学习路由模型"""
        
        if len(self.routing_history) < 100:
            return  # 数据不足,跳过训练
        
        # 准备训练数据
        training_data = []
        for record in self.routing_history[-1000:]:  # 使用最近1000条记录
            features = record.features
            target = 1.0 if record.success else 0.0
            
            training_data.append({
                'features': features,
                'agent_id': record.selected_agent,
                'target': target
            })
        
        # 训练模型
        await self.ml_router.train(training_data)
        
        # 清理历史数据
        self.routing_history = self.routing_history[-500:]  # 保留最近500条

3. 高级集成模式

3.1 适配器模式 (Adapter Pattern)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class LegacySystemAdapter(RoutedAgent):
    """遗留系统适配器 - 集成现有系统到AutoGen"""
    
    def __init__(self, legacy_system_config: LegacySystemConfig):
        super().__init__(f"遗留系统适配器: {legacy_system_config.system_name}")
        self.legacy_client = LegacySystemClient(legacy_system_config)
        self.message_translator = MessageTranslator()
        self.response_mapper = ResponseMapper()
    
    @rpc
    async def legacy_operation(self, request: LegacyOperationRequest, ctx: MessageContext) -> LegacyOperationResponse:
        """执行遗留系统操作"""
        
        try:
            # 1. 转换AutoGen消息为遗留系统格式
            legacy_request = await self.message_translator.to_legacy_format(
                request, self.legacy_client.get_schema()
            )
            
            # 2. 调用遗留系统API
            legacy_response = await self.legacy_client.call_api(
                endpoint=request.operation,
                data=legacy_request,
                timeout=30.0
            )
            
            # 3. 转换遗留系统响应为AutoGen格式
            autogen_response = await self.response_mapper.from_legacy_format(
                legacy_response, LegacyOperationResponse
            )
            
            return autogen_response
        
        except LegacySystemError as e:
            # 遗留系统特定错误处理
            return LegacyOperationResponse(
                success=False,
                error=f"遗留系统错误: {e.error_code} - {e.message}",
                error_code=e.error_code
            )
        
        except Exception as e:
            return LegacyOperationResponse(
                success=False,
                error=f"适配器错误: {str(e)}"
            )

class MessageTranslator:
    """消息转换器"""
    
    async def to_legacy_format(self, autogen_message: Any, legacy_schema: Dict) -> Dict[str, Any]:
        """将AutoGen消息转换为遗留系统格式"""
        
        # 基于schema进行字段映射
        legacy_message = {}
        field_mappings = self._get_field_mappings(type(autogen_message), legacy_schema)
        
        for autogen_field, legacy_field in field_mappings.items():
            if hasattr(autogen_message, autogen_field):
                value = getattr(autogen_message, autogen_field)
                
                # 类型转换
                converted_value = await self._convert_field_value(
                    value, legacy_schema.get(legacy_field, {}).get('type', 'string')
                )
                
                legacy_message[legacy_field] = converted_value
        
        return legacy_message
    
    def _get_field_mappings(self, autogen_type: type, legacy_schema: Dict) -> Dict[str, str]:
        """获取字段映射关系"""
        
        # 可以通过配置文件或注解定义映射关系
        mappings = {
            'user_id': 'userId',
            'created_at': 'createTime',
            'updated_at': 'updateTime',
            'is_active': 'status'  # bool -> string 映射
        }
        
        return mappings

3.2 代理池模式 (Agent Pool Pattern)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class AgentPool:
    """代理池 - 管理同类型代理的池化和复用"""
    
    def __init__(
        self, 
        agent_type: str, 
        min_size: int = 5, 
        max_size: int = 50,
        scale_threshold: float = 0.8
    ):
        self.agent_type = agent_type
        self.min_size = min_size
        self.max_size = max_size
        self.scale_threshold = scale_threshold
        
        self.available_agents = asyncio.Queue()
        self.busy_agents = set()
        self.total_agents = 0
        self.pool_lock = asyncio.Lock()
        
        self.usage_stats = {
            'requests_served': 0,
            'average_wait_time': 0.0,
            'peak_usage': 0
        }
    
    async def initialize_pool(self, runtime: AgentRuntime) -> None:
        """初始化代理池"""
        
        # 创建最小数量的代理
        for i in range(self.min_size):
            agent_id = AgentId(self.agent_type, f"pool_{i}")
            await runtime.get(agent_id, lazy=False)  # 立即创建
            await self.available_agents.put(agent_id)
            self.total_agents += 1
    
    async def acquire_agent(self, timeout: float = 30.0) -> AgentId:
        """从池中获取可用代理"""
        
        acquire_start_time = time.time()
        
        try:
            # 1. 尝试从可用队列获取代理
            agent_id = await asyncio.wait_for(
                self.available_agents.get(),
                timeout=timeout
            )
            
            # 2. 标记为忙碌
            async with self.pool_lock:
                self.busy_agents.add(agent_id)
                
                # 更新使用统计
                wait_time = time.time() - acquire_start_time
                self.usage_stats['requests_served'] += 1
                self.usage_stats['average_wait_time'] = (
                    self.usage_stats['average_wait_time'] * 0.9 + wait_time * 0.1
                )
                self.usage_stats['peak_usage'] = max(
                    self.usage_stats['peak_usage'], 
                    len(self.busy_agents)
                )
            
            # 3. 检查是否需要扩容
            current_usage = len(self.busy_agents) / self.total_agents
            if (current_usage > self.scale_threshold and 
                self.total_agents < self.max_size):
                asyncio.create_task(self._scale_up())
            
            return agent_id
        
        except asyncio.TimeoutError:
            # 获取代理超时,尝试紧急扩容
            if self.total_agents < self.max_size:
                emergency_agent = await self._create_emergency_agent()
                if emergency_agent:
                    return emergency_agent
            
            raise RuntimeError(f"无法在 {timeout} 秒内获取可用代理")
    
    async def release_agent(self, agent_id: AgentId) -> None:
        """释放代理回池中"""
        
        async with self.pool_lock:
            if agent_id in self.busy_agents:
                self.busy_agents.remove(agent_id)
                await self.available_agents.put(agent_id)
            
            # 检查是否需要缩容
            if (self.available_agents.qsize() > self.min_size and 
                len(self.busy_agents) < self.total_agents * 0.3):
                asyncio.create_task(self._scale_down())
    
    async def _scale_up(self) -> None:
        """扩容代理池"""
        
        if self.total_agents >= self.max_size:
            return
        
        try:
            # 创建新代理
            new_agent_id = AgentId(self.agent_type, f"pool_{self.total_agents}")
            runtime = self._get_current_runtime()
            await runtime.get(new_agent_id, lazy=False)
            
            # 添加到可用队列
            await self.available_agents.put(new_agent_id)
            self.total_agents += 1
            
            logger.info(f"代理池扩容: {self.agent_type}, 当前大小: {self.total_agents}")
        
        except Exception as e:
            logger.error(f"代理池扩容失败: {e}")

class PoolManagedAgent(RoutedAgent):
    """池管理代理 - 使用代理池的示例"""
    
    def __init__(self, agent_pools: Dict[str, AgentPool]):
        super().__init__("池管理代理")
        self.agent_pools = agent_pools
    
    @rpc
    async def process_with_pool(self, request: PooledRequest, ctx: MessageContext) -> PooledResponse:
        """使用代理池处理请求"""
        
        pool = self.agent_pools.get(request.required_agent_type)
        if not pool:
            return PooledResponse(
                success=False,
                error=f"未找到代理池: {request.required_agent_type}"
            )
        
        # 从池中获取代理
        agent_id = await pool.acquire_agent(timeout=request.timeout)
        
        try:
            # 使用代理处理请求
            result = await self.send_message(request.payload, agent_id)
            
            return PooledResponse(
                success=True,
                result=result,
                agent_id=str(agent_id)
            )
        
        finally:
            # 释放代理回池中
            await pool.release_agent(agent_id)

4. 高级工具模式

4.1 工具链模式 (Tool Chain Pattern)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class ToolChain:
    """工具链 - 组合多个工具实现复杂功能"""
    
    def __init__(self, name: str):
        self.name = name
        self.tools = []
        self.execution_strategy = 'sequential'  # sequential, parallel, conditional
        self.error_handling = 'fail_fast'  # fail_fast, continue, retry
    
    def add_tool(self, tool: BaseTool, condition: Optional[Callable] = None) -> 'ToolChain':
        """添加工具到链中"""
        
        self.tools.append(ToolChainStep(
            tool=tool,
            condition=condition or (lambda ctx: True)
        ))
        return self
    
    def set_execution_strategy(self, strategy: str) -> 'ToolChain':
        """设置执行策略"""
        self.execution_strategy = strategy
        return self
    
    async def execute(self, input_data: Any, context: Dict[str, Any] = None) -> ToolChainResult:
        """执行工具链"""
        
        context = context or {}
        execution_context = ToolChainExecutionContext(
            input_data=input_data,
            context=context,
            results={}
        )
        
        if self.execution_strategy == 'sequential':
            return await self._execute_sequential(execution_context)
        elif self.execution_strategy == 'parallel':
            return await self._execute_parallel(execution_context)
        elif self.execution_strategy == 'conditional':
            return await self._execute_conditional(execution_context)
        else:
            raise ValueError(f"不支持的执行策略: {self.execution_strategy}")
    
    async def _execute_sequential(self, context: ToolChainExecutionContext) -> ToolChainResult:
        """顺序执行工具链"""
        
        current_data = context.input_data
        step_results = []
        
        for step_index, step in enumerate(self.tools):
            # 检查执行条件
            if not step.condition(context):
                continue
            
            try:
                # 执行工具
                step_result = await step.tool.run(current_data)
                
                # 记录步骤结果
                step_execution = ToolStepExecution(
                    step_index=step_index,
                    tool_name=step.tool.name,
                    input_data=current_data,
                    output_data=step_result,
                    success=True,
                    duration=time.time()
                )
                step_results.append(step_execution)
                
                # 更新上下文
                context.results[step.tool.name] = step_result
                current_data = step_result  # 下一步的输入
            
            except Exception as e:
                step_execution = ToolStepExecution(
                    step_index=step_index,
                    tool_name=step.tool.name,
                    input_data=current_data,
                    error=str(e),
                    success=False
                )
                step_results.append(step_execution)
                
                if self.error_handling == 'fail_fast':
                    return ToolChainResult(
                        success=False,
                        error=f"工具 {step.tool.name} 执行失败: {e}",
                        step_results=step_results
                    )
                elif self.error_handling == 'continue':
                    continue  # 跳过失败的工具
                elif self.error_handling == 'retry':
                    # 实现重试逻辑
                    retry_result = await self._retry_tool_execution(step, current_data)
                    if retry_result.success:
                        current_data = retry_result.output
                    else:
                        continue
        
        return ToolChainResult(
            success=True,
            output=current_data,
            step_results=step_results
        )

class AdvancedToolAgent(RoutedAgent):
    """高级工具代理 - 支持复杂工具编排"""
    
    def __init__(self):
        super().__init__("高级工具代理")
        self.tool_chains = {}
        self.tool_registry = ToolRegistry()
    
    async def register_tool_chain(self, name: str, chain: ToolChain) -> None:
        """注册工具链"""
        self.tool_chains[name] = chain
    
    @rpc
    async def execute_tool_chain(
        self, 
        request: ToolChainRequest, 
        ctx: MessageContext
    ) -> ToolChainResponse:
        """执行工具链"""
        
        if request.chain_name not in self.tool_chains:
            return ToolChainResponse(
                success=False,
                error=f"未找到工具链: {request.chain_name}"
            )
        
        chain = self.tool_chains[request.chain_name]
        
        try:
            result = await chain.execute(
                input_data=request.input_data,
                context=request.context
            )
            
            return ToolChainResponse(
                success=result.success,
                output=result.output,
                step_results=result.step_results,
                error=result.error
            )
        
        except Exception as e:
            return ToolChainResponse(
                success=False,
                error=f"工具链执行异常: {str(e)}"
            )

# 工具链使用示例
async def setup_data_analysis_chain():
    """设置数据分析工具链"""
    
    # 创建工具链
    analysis_chain = ToolChain("数据分析链") \
        .add_tool(DataValidationTool()) \
        .add_tool(DataCleaningTool()) \
        .add_tool(StatisticalAnalysisTool()) \
        .add_tool(VisualizationTool()) \
        .set_execution_strategy('sequential')
    
    # 注册到代理
    tool_agent = AdvancedToolAgent()
    await tool_agent.register_tool_chain("data_analysis", analysis_chain)
    
    return tool_agent

4.2 观察者模式 (Observer Pattern)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class EventBus(RoutedAgent):
    """事件总线 - 实现发布订阅的观察者模式"""
    
    def __init__(self):
        super().__init__("事件总线")
        self.event_subscriptions = defaultdict(list)
        self.event_filters = {}
        self.dead_letter_queue = DeadLetterQueue()
    
    async def subscribe_to_event(
        self, 
        event_type: str, 
        subscriber: AgentId,
        filter_condition: Optional[Callable] = None
    ) -> str:
        """订阅事件"""
        
        subscription_id = str(uuid.uuid4())
        
        subscription = EventSubscription(
            id=subscription_id,
            event_type=event_type,
            subscriber=subscriber,
            filter_condition=filter_condition,
            created_at=datetime.utcnow()
        )
        
        self.event_subscriptions[event_type].append(subscription)
        
        return subscription_id
    
    @event
    async def handle_domain_event(self, event: DomainEvent, ctx: MessageContext) -> None:
        """处理领域事件并分发给订阅者"""
        
        event_type = event.event_type
        subscribers = self.event_subscriptions.get(event_type, [])
        
        if not subscribers:
            # 没有订阅者,记录到死信队列
            await self.dead_letter_queue.add(event, "无订阅者")
            return
        
        # 并行分发给所有订阅者
        delivery_tasks = []
        
        for subscription in subscribers:
            # 检查过滤条件
            if subscription.filter_condition and not subscription.filter_condition(event):
                continue
            
            # 创建分发任务
            task = asyncio.create_task(
                self._deliver_event_to_subscriber(event, subscription)
            )
            delivery_tasks.append(task)
        
        # 等待所有分发完成
        if delivery_tasks:
            results = await asyncio.gather(*delivery_tasks, return_exceptions=True)
            
            # 处理分发失败
            failed_deliveries = [
                result for result in results 
                if isinstance(result, Exception)
            ]
            
            if failed_deliveries:
                logger.warning(f"事件分发失败: {len(failed_deliveries)}/{len(delivery_tasks)}")
    
    async def _deliver_event_to_subscriber(
        self, 
        event: DomainEvent, 
        subscription: EventSubscription
    ) -> None:
        """分发事件给订阅者"""
        
        try:
            await self.send_message(
                event,
                subscription.subscriber
            )
        except Exception as e:
            # 分发失败,添加到死信队列
            await self.dead_letter_queue.add(
                event, 
                f"分发到 {subscription.subscriber} 失败: {str(e)}"
            )
            raise

class SmartEventFilter:
    """智能事件过滤器"""
    
    def __init__(self):
        self.filter_rules = []
        self.ml_filter = MLEventFilter()
    
    def add_rule(self, rule: FilterRule) -> None:
        """添加过滤规则"""
        self.filter_rules.append(rule)
    
    async def should_deliver(self, event: DomainEvent, subscriber: AgentId) -> bool:
        """判断是否应该分发事件"""
        
        # 1. 应用规则过滤
        for rule in self.filter_rules:
            if not rule.matches(event, subscriber):
                return False
        
        # 2. 应用机器学习过滤
        relevance_score = await self.ml_filter.predict_relevance(event, subscriber)
        return relevance_score > 0.7
    
    async def train_filter(self, training_data: List[EventDeliveryRecord]) -> None:
        """训练过滤器"""
        await self.ml_filter.train(training_data)

5. 高级状态管理模式

5.1 状态机代理模式 (State Machine Agent Pattern)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class StateMachineAgent(RoutedAgent):
    """状态机代理 - 基于状态机的复杂业务流程管理"""
    
    def __init__(self, name: str, state_machine_config: StateMachineConfig):
        super().__init__(f"状态机代理: {name}")
        self.state_machine = StateMachine(state_machine_config)
        self.current_state = state_machine_config.initial_state
        self.state_history = []
        self.transition_handlers = {}
    
    @rpc
    async def trigger_transition(
        self, 
        trigger: StateTrigger, 
        ctx: MessageContext
    ) -> StateTransitionResult:
        """触发状态转换"""
        
        transition_id = str(uuid.uuid4())
        start_time = datetime.utcnow()
        
        try:
            # 1. 验证转换合法性
            valid_transitions = self.state_machine.get_valid_transitions(self.current_state)
            
            target_state = None
            for transition in valid_transitions:
                if transition.trigger == trigger.trigger_type:
                    target_state = transition.target_state
                    break
            
            if not target_state:
                return StateTransitionResult(
                    success=False,
                    error=f"从状态 {self.current_state} 无法通过触发器 {trigger.trigger_type} 进行转换"
                )
            
            # 2. 执行转换前处理
            pre_transition_result = await self._execute_pre_transition(
                self.current_state, target_state, trigger
            )
            
            if not pre_transition_result.success:
                return StateTransitionResult(
                    success=False,
                    error=f"转换前处理失败: {pre_transition_result.error}"
                )
            
            # 3. 执行状态转换
            old_state = self.current_state
            self.current_state = target_state
            
            # 4. 执行转换后处理
            post_transition_result = await self._execute_post_transition(
                old_state, target_state, trigger
            )
            
            # 5. 记录状态历史
            state_change = StateChange(
                transition_id=transition_id,
                from_state=old_state,
                to_state=target_state,
                trigger=trigger,
                timestamp=start_time,
                context=trigger.context
            )
            self.state_history.append(state_change)
            
            # 6. 发布状态变更事件
            await self.publish_message(
                StateChangedEvent(
                    agent_id=self.id,
                    state_change=state_change
                ),
                TopicId("agent.state.changed", str(self.id))
            )
            
            return StateTransitionResult(
                success=True,
                transition_id=transition_id,
                from_state=old_state,
                to_state=target_state,
                duration=(datetime.utcnow() - start_time).total_seconds()
            )
        
        except Exception as e:
            return StateTransitionResult(
                success=False,
                error=f"状态转换异常: {str(e)}"
            )
    
    async def _execute_pre_transition(
        self, 
        from_state: str, 
        to_state: str, 
        trigger: StateTrigger
    ) -> TransitionResult:
        """执行转换前处理"""
        
        handler_key = f"{from_state}_to_{to_state}_pre"
        
        if handler_key in self.transition_handlers:
            handler = self.transition_handlers[handler_key]
            return await handler(trigger)
        
        return TransitionResult(success=True)
    
    async def _execute_post_transition(
        self, 
        from_state: str, 
        to_state: str, 
        trigger: StateTrigger
    ) -> TransitionResult:
        """执行转换后处理"""
        
        handler_key = f"{from_state}_to_{to_state}_post"
        
        if handler_key in self.transition_handlers:
            handler = self.transition_handlers[handler_key]
            return await handler(trigger)
        
        return TransitionResult(success=True)

# 状态机配置示例
order_state_machine_config = StateMachineConfig(
    name="订单状态机",
    initial_state="created",
    states=[
        "created", "paid", "processing", "shipped", "delivered", "cancelled"
    ],
    transitions=[
        StateTransition("created", "paid", "payment_completed"),
        StateTransition("paid", "processing", "start_processing"),
        StateTransition("processing", "shipped", "shipping_confirmed"),
        StateTransition("shipped", "delivered", "delivery_confirmed"),
        StateTransition("created", "cancelled", "order_cancelled"),
        StateTransition("paid", "cancelled", "order_cancelled"),
    ]
)

class OrderManagementAgent(StateMachineAgent):
    """订单管理代理 - 使用状态机管理订单生命周期"""
    
    def __init__(self):
        super().__init__("订单管理", order_state_machine_config)
        
        # 注册转换处理器
        self.transition_handlers.update({
            "created_to_paid_post": self._on_payment_completed,
            "processing_to_shipped_post": self._on_shipping_confirmed,
            "shipped_to_delivered_post": self._on_delivery_confirmed
        })
    
    async def _on_payment_completed(self, trigger: StateTrigger) -> TransitionResult:
        """支付完成后处理"""
        
        # 通知库存系统开始处理
        await self.send_message(
            StartProcessingCommand(order_id=trigger.context['order_id']),
            AgentId("InventoryAgent", "default")
        )
        
        return TransitionResult(success=True)
    
    async def _on_shipping_confirmed(self, trigger: StateTrigger) -> TransitionResult:
        """发货确认后处理"""
        
        # 发送发货通知
        await self.send_message(
            ShippingNotification(
                order_id=trigger.context['order_id'],
                tracking_number=trigger.context['tracking_number']
            ),
            AgentId("NotificationAgent", "default")
        )
        
        return TransitionResult(success=True)

5.2 分布式锁模式

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class DistributedLockAgent(RoutedAgent):
    """分布式锁代理 - 实现跨代理的资源互斥访问"""
    
    def __init__(self):
        super().__init__("分布式锁代理")
        self.locks = {}
        self.lock_waiters = defaultdict(list)
        self.lock_timeouts = {}
    
    @rpc
    async def acquire_lock(
        self, 
        request: LockRequest, 
        ctx: MessageContext
    ) -> LockResult:
        """获取分布式锁"""
        
        lock_key = request.resource_id
        requester = ctx.sender
        timeout = request.timeout or 30.0
        
        # 检查锁是否已被占用
        if lock_key in self.locks:
            current_lock = self.locks[lock_key]
            
            if current_lock.owner == requester:
                # 重入锁
                current_lock.reentrant_count += 1
                return LockResult(
                    success=True,
                    lock_id=current_lock.lock_id,
                    is_reentrant=True
                )
            
            if not request.wait:
                # 不等待,直接返回失败
                return LockResult(
                    success=False,
                    error="资源已被锁定"
                )
            
            # 加入等待队列
            waiter = LockWaiter(
                requester=requester,
                timeout=timeout,
                requested_at=datetime.utcnow()
            )
            self.lock_waiters[lock_key].append(waiter)
            
            # 设置超时
            timeout_task = asyncio.create_task(
                self._handle_lock_timeout(lock_key, requester, timeout)
            )
            
            return LockResult(
                success=True,
                lock_id=f"waiting_{uuid.uuid4().hex[:8]}",
                is_waiting=True
            )
        
        # 创建新锁
        lock_id = str(uuid.uuid4())
        lock = DistributedLock(
            lock_id=lock_id,
            resource_id=lock_key,
            owner=requester,
            acquired_at=datetime.utcnow(),
            ttl=request.ttl or 300  # 默认5分钟TTL
        )
        
        self.locks[lock_key] = lock
        
        # 设置锁过期
        if lock.ttl > 0:
            asyncio.create_task(self._schedule_lock_expiry(lock_key, lock.ttl))
        
        return LockResult(
            success=True,
            lock_id=lock_id
        )
    
    @rpc
    async def release_lock(self, request: LockReleaseRequest, ctx: MessageContext) -> LockReleaseResult:
        """释放分布式锁"""
        
        lock_key = request.resource_id
        requester = ctx.sender
        
        if lock_key not in self.locks:
            return LockReleaseResult(
                success=False,
                error="锁不存在"
            )
        
        current_lock = self.locks[lock_key]
        
        if current_lock.owner != requester:
            return LockReleaseResult(
                success=False,
                error="不是锁的拥有者"
            )
        
        # 处理重入锁
        if current_lock.reentrant_count > 0:
            current_lock.reentrant_count -= 1
            return LockReleaseResult(
                success=True,
                is_reentrant_release=True
            )
        
        # 释放锁
        del self.locks[lock_key]
        
        # 通知等待者
        await self._notify_next_waiter(lock_key)
        
        return LockReleaseResult(success=True)
    
    async def _notify_next_waiter(self, lock_key: str) -> None:
        """通知下一个等待者"""
        
        waiters = self.lock_waiters.get(lock_key, [])
        if not waiters:
            return
        
        # 按先来先服务原则通知
        next_waiter = waiters.pop(0)
        
        # 检查等待者是否超时
        if datetime.utcnow() - next_waiter.requested_at > timedelta(seconds=next_waiter.timeout):
            # 超时,尝试下一个等待者
            await self._notify_next_waiter(lock_key)
            return
        
        # 为等待者创建锁
        lock_id = str(uuid.uuid4())
        lock = DistributedLock(
            lock_id=lock_id,
            resource_id=lock_key,
            owner=next_waiter.requester,
            acquired_at=datetime.utcnow()
        )
        
        self.locks[lock_key] = lock
        
        # 通知等待者锁已获取
        await self.send_message(
            LockAcquiredNotification(
                lock_id=lock_id,
                resource_id=lock_key
            ),
            next_waiter.requester
        )

6. 高性能模式

6.1 批处理聚合模式

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class BatchAggregatorAgent(RoutedAgent):
    """批处理聚合代理 - 提高处理效率的批量模式"""
    
    def __init__(self, batch_config: BatchConfig):
        super().__init__("批处理聚合代理")
        self.batch_size = batch_config.batch_size
        self.batch_timeout = batch_config.batch_timeout
        self.batches = {}
        self.batch_locks = defaultdict(asyncio.Lock)
    
    @message_handler
    async def handle_batchable_request(
        self, 
        request: BatchableRequest, 
        ctx: MessageContext
    ) -> BatchableResponse:
        """处理可批处理的请求"""
        
        batch_key = self._get_batch_key(request)
        
        async with self.batch_locks[batch_key]:
            # 获取或创建批次
            if batch_key not in self.batches:
                self.batches[batch_key] = Batch(
                    key=batch_key,
                    max_size=self.batch_size,
                    timeout=self.batch_timeout
                )
                
                # 启动批次超时处理
                asyncio.create_task(self._handle_batch_timeout(batch_key))
            
            batch = self.batches[batch_key]
            
            # 添加请求到批次
            batch.add_request(request, ctx)
            
            # 检查是否达到批次大小
            if batch.is_full():
                return await self._process_batch(batch_key)
            
            # 等待批次处理
            return await batch.wait_for_result(request.id)
    
    async def _process_batch(self, batch_key: str) -> BatchableResponse:
        """处理整个批次"""
        
        if batch_key not in self.batches:
            return BatchableResponse(success=False, error="批次不存在")
        
        batch = self.batches[batch_key]
        
        try:
            # 1. 聚合所有请求
            aggregated_request = await self._aggregate_requests(batch.requests)
            
            # 2. 批量处理
            batch_result = await self._execute_batch_processing(aggregated_request)
            
            # 3. 分发结果给各个请求
            await self._distribute_results(batch, batch_result)
            
            # 4. 清理批次
            del self.batches[batch_key]
            
            return BatchableResponse(
                success=True,
                batch_size=len(batch.requests),
                processing_time=batch_result.processing_time
            )
        
        except Exception as e:
            # 批处理失败,通知所有等待者
            await self._notify_batch_failure(batch, str(e))
            del self.batches[batch_key]
            
            return BatchableResponse(
                success=False,
                error=f"批处理失败: {str(e)}"
            )
    
    def _get_batch_key(self, request: BatchableRequest) -> str:
        """生成批次键 - 相同类型的请求会被分组"""
        
        # 根据请求类型和某些属性生成批次键
        key_components = [
            request.request_type,
            request.priority,
            request.target_service
        ]
        
        return "_".join(str(component) for component in key_components)
    
    async def _aggregate_requests(self, requests: List[BatchableRequest]) -> AggregatedRequest:
        """聚合多个请求为单个批量请求"""
        
        # 根据请求类型进行不同的聚合策略
        first_request = requests[0]
        
        if first_request.request_type == "database_query":
            # 数据库查询聚合
            return await self._aggregate_database_queries(requests)
        elif first_request.request_type == "api_call":
            # API调用聚合
            return await self._aggregate_api_calls(requests)
        elif first_request.request_type == "computation":
            # 计算任务聚合
            return await self._aggregate_computations(requests)
        else:
            # 默认聚合
            return AggregatedRequest(
                request_type=first_request.request_type,
                items=[req.payload for req in requests],
                metadata={'original_count': len(requests)}
            )

6.2 缓存代理模式

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class IntelligentCacheAgent(RoutedAgent):
    """智能缓存代理 - 实现多层缓存和智能失效策略"""
    
    def __init__(self):
        super().__init__("智能缓存代理")
        self.cache_layers = {
            'l1': InMemoryCache(maxsize=1000, ttl=300),     # L1: 内存缓存
            'l2': RedisCache(host='redis-cluster'),         # L2: Redis缓存
            'l3': DatabaseCache(connection_string='...')    # L3: 数据库缓存
        }
        self.cache_policies = CachePolicyManager()
        self.access_patterns = AccessPatternAnalyzer()
    
    @rpc
    async def get_cached_data(self, request: CacheGetRequest, ctx: MessageContext) -> CacheGetResponse:
        """获取缓存数据 - 智能多层缓存策略"""
        
        cache_key = request.key
        namespace = request.namespace or 'default'
        full_key = f"{namespace}:{cache_key}"
        
        # 1. 分析访问模式
        access_pattern = await self.access_patterns.analyze_access(full_key, ctx.sender)
        
        # 2. 确定缓存策略
        cache_policy = await self.cache_policies.get_policy(full_key, access_pattern)
        
        # 3. 按策略顺序查找缓存
        for layer_name in cache_policy.search_order:
            cache_layer = self.cache_layers[layer_name]
            
            try:
                cached_value = await cache_layer.get(full_key)
                if cached_value is not None:
                    # 缓存命中,执行预热策略
                    await self._execute_warming_strategy(
                        full_key, cached_value, layer_name, cache_policy
                    )
                    
                    return CacheGetResponse(
                        success=True,
                        value=cached_value,
                        hit_layer=layer_name,
                        access_pattern=access_pattern.pattern_type
                    )
            
            except Exception as e:
                logger.warning(f"缓存层 {layer_name} 访问失败: {e}")
                continue
        
        # 所有缓存层都未命中
        return CacheGetResponse(
            success=False,
            cache_miss=True
        )
    
    @rpc
    async def set_cached_data(self, request: CacheSetRequest, ctx: MessageContext) -> CacheSetResponse:
        """设置缓存数据 - 智能分层存储策略"""
        
        full_key = f"{request.namespace or 'default'}:{request.key}"
        
        # 1. 分析数据特征
        data_characteristics = await self._analyze_data_characteristics(request.value)
        
        # 2. 确定存储策略
        storage_policy = await self.cache_policies.get_storage_policy(
            full_key, data_characteristics
        )
        
        # 3. 按策略存储到相应层级
        storage_results = {}
        
        for layer_name, layer_config in storage_policy.layers.items():
            cache_layer = self.cache_layers[layer_name]
            
            try:
                success = await cache_layer.set(
                    key=full_key,
                    value=request.value,
                    ttl=layer_config.ttl
                )
                storage_results[layer_name] = success
            
            except Exception as e:
                logger.error(f"缓存层 {layer_name} 存储失败: {e}")
                storage_results[layer_name] = False
        
        # 4. 更新访问模式
        await self.access_patterns.record_write(full_key, ctx.sender)
        
        success_count = sum(1 for success in storage_results.values() if success)
        
        return CacheSetResponse(
            success=success_count > 0,
            stored_layers=list(storage_results.keys()),
            storage_results=storage_results
        )
    
    async def _execute_warming_strategy(
        self, 
        cache_key: str, 
        value: Any, 
        hit_layer: str, 
        policy: CachePolicy
    ) -> None:
        """执行缓存预热策略"""
        
        warming_strategy = policy.warming_strategies.get(hit_layer)
        if not warming_strategy:
            return
        
        if warming_strategy.type == 'promote_to_faster_layer':
            # 提升到更快的缓存层
            faster_layers = policy.get_faster_layers(hit_layer)
            
            for layer_name in faster_layers:
                cache_layer = self.cache_layers[layer_name]
                try:
                    await cache_layer.set(
                        cache_key, value, ttl=warming_strategy.ttl
                    )
                except Exception as e:
                    logger.warning(f"缓存预热失败 {layer_name}: {e}")
        
        elif warming_strategy.type == 'prefetch_related':
            # 预取相关数据
            related_keys = await warming_strategy.get_related_keys(cache_key)
            
            prefetch_tasks = [
                self._prefetch_data(key, hit_layer)
                for key in related_keys
            ]
            
            await asyncio.gather(*prefetch_tasks, return_exceptions=True)

7. 监控和可观测性模式

7.1 指标收集代理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class MetricsCollectionAgent(RoutedAgent):
    """指标收集代理 - 实现全面的系统监控"""
    
    def __init__(self):
        super().__init__("指标收集代理")
        self.metric_stores = {
            'prometheus': PrometheusMetricStore(),
            'influxdb': InfluxDBMetricStore(),
            'custom': CustomMetricStore()
        }
        self.collection_schedules = {}
        self.alert_rules = AlertRuleEngine()
    
    async def start_metric_collection(self, config: MetricCollectionConfig) -> None:
        """启动指标收集"""
        
        for metric_config in config.metrics:
            # 创建收集任务
            collection_task = asyncio.create_task(
                self._collect_metric_periodically(metric_config)
            )
            
            self.collection_schedules[metric_config.name] = {
                'task': collection_task,
                'config': metric_config
            }
    
    async def _collect_metric_periodically(self, metric_config: MetricConfig) -> None:
        """定期收集指标"""
        
        while True:
            try:
                # 收集指标值
                metric_value = await self._collect_single_metric(metric_config)
                
                # 存储到所有配置的后端
                for store_name in metric_config.stores:
                    store = self.metric_stores[store_name]
                    await store.record_metric(
                        name=metric_config.name,
                        value=metric_value,
                        labels=metric_config.labels,
                        timestamp=datetime.utcnow()
                    )
                
                # 检查告警规则
                await self._check_alert_rules(metric_config.name, metric_value)
                
                # 等待下次收集
                await asyncio.sleep(metric_config.interval)
            
            except Exception as e:
                logger.error(f"收集指标 {metric_config.name} 失败: {e}")
                await asyncio.sleep(5)  # 错误后短暂等待
    
    async def _collect_single_metric(self, config: MetricConfig) -> float:
        """收集单个指标"""
        
        if config.type == 'agent_count':
            return await self._count_active_agents()
        elif config.type == 'message_rate':
            return await self._calculate_message_rate()
        elif config.type == 'response_time':
            return await self._measure_average_response_time()
        elif config.type == 'error_rate':
            return await self._calculate_error_rate()
        elif config.type == 'memory_usage':
            return await self._get_memory_usage()
        elif config.type == 'cpu_usage':
            return await self._get_cpu_usage()
        elif config.type == 'custom':
            # 自定义指标收集
            return await self._collect_custom_metric(config)
        else:
            raise ValueError(f"不支持的指标类型: {config.type}")

8. 总结

8.1 模式选择指南

使用场景推荐模式主要优势
复杂业务流程微服务代理模式模块化、可扩展
状态驱动流程状态机代理模式清晰的状态管理
数据处理流水线管道模式可组合、易维护
高并发场景代理池模式性能优化、资源复用
事件驱动架构观察者模式松耦合、可扩展
批量处理批处理聚合模式高吞吐、效率优化

8.2 性能优化建议

  1. 合理使用批处理: 在高并发场景下使用批处理模式
  2. 智能缓存策略: 根据访问模式选择合适的缓存层级
  3. 代理池管理: 对频繁使用的代理类型实施池化管理
  4. 异步优先: 所有I/O操作都应该是异步的
  5. 资源管理: 实现适当的资源清理和生命周期管理

8.3 架构演进建议

  1. 从简单开始: 先实现基本功能,再逐步添加高级模式
  2. 模式组合: 可以组合使用多种模式解决复杂问题
  3. 监控驱动: 基于监控数据决定是否引入复杂模式
  4. 渐进式重构: 逐步将现有代码重构为高级模式
  5. 文档先行: 为复杂模式编写详细的文档和示例

通过掌握这些高级模式,开发者可以构建出更加健壮、高效、可维护的企业级多代理系统。