1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
| /// <summary>
/// gRPC消息路由器 - 处理客户端和服务器之间的消息路由
/// </summary>
public class GrpcMessageRouter : IAsyncDisposable
{
private readonly AgentRpc.AgentRpcClient _client;
private readonly IMessageSink<Message> _messageSink;
private readonly string _clientId;
private readonly ILogger<GrpcMessageRouter> _logger;
private readonly CancellationToken _shutdownToken;
private AsyncDuplexStreamingCall<Message, Message>? _channel;
private readonly ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>> _pendingRequests;
private readonly SemaphoreSlim _channelSemaphore;
public GrpcMessageRouter(
AgentRpc.AgentRpcClient client,
IMessageSink<Message> messageSink,
string clientId,
ILogger<GrpcMessageRouter> logger,
CancellationToken shutdownToken)
{
_client = client;
_messageSink = messageSink;
_clientId = clientId;
_logger = logger;
_shutdownToken = shutdownToken;
_pendingRequests = new ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>>();
_channelSemaphore = new SemaphoreSlim(1, 1);
}
/// <summary>
/// 启动消息路由器
/// </summary>
public async Task StartAsync()
{
await _channelSemaphore.WaitAsync(_shutdownToken);
try
{
if (_channel != null)
{
return; // 已经启动
}
_logger.LogInformation("启动 gRPC 消息路由器");
// 建立双向流式连接
_channel = _client.OpenChannel(cancellationToken: _shutdownToken);
// 启动消息接收循环
_ = Task.Run(ReceiveMessagesLoop, _shutdownToken);
_logger.LogInformation("gRPC 消息路由器已启动");
}
finally
{
_channelSemaphore.Release();
}
}
/// <summary>
/// 停止消息路由器
/// </summary>
public async Task StopAsync()
{
await _channelSemaphore.WaitAsync();
try
{
if (_channel != null)
{
_logger.LogInformation("停止 gRPC 消息路由器");
// 关闭发送流
await _channel.RequestStream.CompleteAsync();
// 取消所有待处理的请求
foreach (var (requestId, tcs) in _pendingRequests)
{
tcs.TrySetCanceled();
}
_pendingRequests.Clear();
_channel.Dispose();
_channel = null;
_logger.LogInformation("gRPC 消息路由器已停止");
}
}
finally
{
_channelSemaphore.Release();
}
}
/// <summary>
/// 发送RPC请求
/// </summary>
public async ValueTask<object?> SendRpcRequestAsync(
object message,
AgentId target,
AgentId? source = null,
string? messageId = null,
CancellationToken cancellationToken = default)
{
var request = new RpcRequest
{
RequestId = messageId ?? Guid.NewGuid().ToString(),
Target = new Protobuf.AgentId { Type = target.Type, Key = target.Key },
Method = "HandleAsync",
Payload = SerializeMessage(message)
};
if (source.HasValue)
{
request.Source = new Protobuf.AgentId { Type = source.Value.Type, Key = source.Value.Key };
}
// 创建等待响应的任务
var tcs = new TaskCompletionSource<RpcResponse>();
_pendingRequests.TryAdd(request.RequestId, tcs);
try
{
// 发送请求
await _channel!.RequestStream.WriteAsync(new Message { Request = request }, cancellationToken);
// 等待响应
var response = await tcs.Task.WaitAsync(cancellationToken);
if (!string.IsNullOrEmpty(response.Error))
{
throw new InvalidOperationException($"远程代理错误: {response.Error}");
}
return DeserializeMessage(response.Payload);
}
finally
{
_pendingRequests.TryRemove(request.RequestId, out _);
}
}
/// <summary>
/// 发布CloudEvent
/// </summary>
public async ValueTask PublishCloudEventAsync(
object message,
TopicId topic,
AgentId? source = null,
string? messageId = null,
CancellationToken cancellationToken = default)
{
var cloudEvent = new CloudEvent
{
Id = messageId ?? Guid.NewGuid().ToString(),
Type = topic.Type,
Source = topic.Source,
SpecVersion = "1.0",
Time = Timestamp.FromDateTime(DateTime.UtcNow),
Data = Google.Protobuf.WellKnownTypes.Any.Pack(SerializeToAny(message))
};
var messageToSend = new Message { CloudEvent = cloudEvent };
await _channel!.RequestStream.WriteAsync(messageToSend, cancellationToken);
}
/// <summary>
/// 注册代理类型
/// </summary>
public async Task RegisterAgentTypeAsync(string agentType)
{
try
{
var request = new RegisterAgentTypeRequest { Type = agentType };
await _client.RegisterAgentAsync(request, cancellationToken: _shutdownToken);
_logger.LogInformation($"成功注册代理类型: {agentType}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"注册代理类型失败: {agentType}");
throw;
}
}
/// <summary>
/// 添加订阅
/// </summary>
public async Task AddSubscriptionAsync(Subscription subscription)
{
try
{
var request = new AddSubscriptionRequest { Subscription = subscription };
await _client.AddSubscriptionAsync(request, cancellationToken: _shutdownToken);
_logger.LogInformation($"成功添加订阅: {subscription.Id}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"添加订阅失败: {subscription.Id}");
throw;
}
}
/// <summary>
/// 消息接收循环
/// </summary>
private async Task ReceiveMessagesLoop()
{
try
{
await foreach (var message in _channel!.ResponseStream.ReadAllAsync(_shutdownToken))
{
_ = Task.Run(() => ProcessIncomingMessage(message), _shutdownToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("消息接收循环已取消");
}
catch (Exception ex)
{
_logger.LogError(ex, "消息接收循环发生错误");
}
}
/// <summary>
/// 处理收到的消息
/// </summary>
private async Task ProcessIncomingMessage(Message message)
{
try
{
switch (message.MessageCase)
{
case Message.MessageOneofCase.Response:
await HandleRpcResponse(message.Response);
break;
case Message.MessageOneofCase.Request:
case Message.MessageOneofCase.CloudEvent:
await _messageSink.HandleMessageAsync(message, _shutdownToken);
break;
default:
_logger.LogWarning($"收到未知消息类型: {message.MessageCase}");
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理收到消息时发生错误");
}
}
/// <summary>
/// 处理RPC响应
/// </summary>
private async Task HandleRpcResponse(RpcResponse response)
{
if (_pendingRequests.TryRemove(response.RequestId, out var tcs))
{
tcs.TrySetResult(response);
}
else
{
_logger.LogWarning($"收到未期望的RPC响应: {response.RequestId}");
}
}
// 序列化和反序列化方法...
private Payload SerializeMessage(object message) { /* 实现略 */ }
private object? DeserializeMessage(Payload payload) { /* 实现略 */ }
public async ValueTask DisposeAsync()
{
await StopAsync();
_channelSemaphore?.Dispose();
}
}
|