1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12
13use super::{
14 Common::TransportType,
15 TransportConfig::TransportConfig,
16 TransportError::TransportError,
17 UnifiedRequest::UnifiedRequest,
18 UnifiedResponse::UnifiedResponse,
19};
20
21#[async_trait]
27pub trait TransportStrategy: Send + Sync {
28 async fn Connect(&mut self) -> Result<(), TransportError>;
30
31 async fn Disconnect(&mut self) -> Result<(), TransportError>;
33
34 async fn SendRequest(&mut self, Request:UnifiedRequest) -> Result<UnifiedResponse, TransportError>;
36
37 async fn SendNotification(&mut self, Notification:UnifiedRequest) -> Result<(), TransportError>;
39
40 fn StreamEvents(&self)
42 -> std::result::Result<futures::stream::BoxStream<'static, UnifiedResponse>, TransportError>;
43
44 fn IsConnected(&self) -> bool;
46
47 fn LatencyMilliseconds(&self) -> u64;
49
50 fn TransportKind(&self) -> TransportType;
52
53 fn Configuration(&self) -> &TransportConfig;
55
56 fn SupportsStreaming(&self) -> bool;
58
59 fn Capabilities(&self) -> TransportCapabilities;
61
62 fn Metrics(&self) -> TransportMetrics;
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub struct TransportCapabilities {
69 pub MaximumMessageSize:usize,
71
72 pub SupportsRequestResponse:bool,
74
75 pub SupportsServerStreaming:bool,
77
78 pub SupportsClientStreaming:bool,
80
81 pub SupportsBidirectionalStreaming:bool,
83
84 pub SupportsNotifications:bool,
86
87 pub MaximumConcurrent:usize,
89
90 pub RequiresNetwork:bool,
92
93 pub SupportsEncryption:bool,
95
96 pub SupportsCompression:bool,
98}
99
100impl Default for TransportCapabilities {
101 fn default() -> Self {
102 Self {
103 MaximumMessageSize:1024 * 1024, SupportsRequestResponse:true,
105
106 SupportsServerStreaming:false,
107
108 SupportsClientStreaming:false,
109
110 SupportsBidirectionalStreaming:false,
111
112 SupportsNotifications:true,
113
114 MaximumConcurrent:100,
115
116 RequiresNetwork:false,
117
118 SupportsEncryption:false,
119
120 SupportsCompression:false,
121 }
122 }
123}
124
125#[derive(Debug, Clone, Default)]
127pub struct TransportMetrics {
128 pub RequestsTotal:u64,
130
131 pub RequestsSuccessful:u64,
133
134 pub RequestsFailed:u64,
136
137 pub NotificationsSent:u64,
139
140 pub ConnectionsEstablished:u64,
142
143 pub ConnectionFailures:u64,
145
146 pub BytesSent:u64,
148
149 pub BytesReceived:u64,
151
152 pub CircuitBreakerState:u32,
154
155 pub LatencyMillisecondsHistogram:Option<(u64, f64, f64)>,
158
159 pub ActiveConnections:u32,
161
162 pub PendingRequests:u32,
164}
165
166impl TransportMetrics {
167 pub fn New() -> Self { Self::default() }
169
170 pub fn Reset(&mut self) { *self = Self::New(); }
172
173 pub fn SuccessRate(&self) -> Option<f64> {
175 let Total = self.RequestsTotal;
176
177 if Total == 0 {
178 None
179 } else {
180 Some((self.RequestsSuccessful as f64 / Total as f64) * 100.0)
181 }
182 }
183
184 pub fn AverageLatency(&self) -> Option<f64> {
186 let (Count, Sum, _) = self.LatencyMillisecondsHistogram?;
187
188 if Count == 0 { None } else { Some(Sum / Count as f64) }
189 }
190
191 pub fn LatencyPercentile95(&self) -> Option<f64> {
193 let (Count, Mean, SumSquared) = self.LatencyMillisecondsHistogram?;
194
195 if Count < 20 {
196 return None;
197 }
198
199 let Variance = (SumSquared / Count as f64) - (Mean * Mean);
200
201 let StandardDeviation = Variance.sqrt();
202
203 Some(Mean + 1.645 * StandardDeviation)
204 }
205
206 pub fn RecordLatency(&mut self, LatencyMilliseconds:f64) {
208 let (Count, Sum, SumSquared) = self.LatencyMillisecondsHistogram.get_or_insert((0, 0.0, 0.0));
209
210 *Count += 1;
211 *Sum += LatencyMilliseconds;
212 *SumSquared += LatencyMilliseconds * LatencyMilliseconds;
213 }
214
215 pub fn IncrementRequestSuccess(&mut self) {
217 self.RequestsTotal += 1;
218
219 self.RequestsSuccessful += 1;
220 }
221
222 pub fn IncrementRequestFailure(&mut self) {
224 self.RequestsTotal += 1;
225
226 self.RequestsFailed += 1;
227 }
228
229 pub fn SetCircuitBreakerState(&mut self, State:CircuitBreakerState) {
231 let StateCode = match State {
232 CircuitBreakerState::Closed => 1,
233
234 CircuitBreakerState::Open => 0,
235
236 CircuitBreakerState::HalfOpen => 2,
237 };
238
239 let OldState = self.CircuitBreakerState;
240
241 self.CircuitBreakerState = (OldState & 0xFFFF_0000) | StateCode as u32;
242 }
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub enum CircuitBreakerState {
248 Closed,
250
251 Open,
253
254 HalfOpen,
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
260#[repr(u16)]
261pub enum TransportErrorCode {
262 ConnectionFailed = 100,
264
265 Timeout = 101,
267
268 NotFound = 102,
270
271 InvalidRequest = 103,
273
274 RemoteError = 104,
276
277 MessageTooLarge = 105,
279
280 EncryptionError = 106,
282
283 SerializationError = 107,
285
286 Unauthorized = 108,
288
289 RateLimited = 109,
291
292 NotSupported = 110,
294
295 InternalError = 111,
297
298 CircuitBreakerOpen = 112,
300
301 StreamError = 113,
303
304 ConfigurationError = 114,
306}
307
308impl TransportErrorCode {
309 pub fn IsRetryable(&self) -> bool {
311 matches!(
312 self,
313 TransportErrorCode::ConnectionFailed
314 | TransportErrorCode::Timeout
315 | TransportErrorCode::RateLimited
316 | TransportErrorCode::RemoteError
317 )
318 }
319
320 pub fn RecommendedRetryDelayMilliseconds(&self) -> u64 {
322 match self {
323 TransportErrorCode::ConnectionFailed => 1000,
324
325 TransportErrorCode::Timeout => 500,
326
327 TransportErrorCode::RateLimited => 2000,
328
329 TransportErrorCode::RemoteError => 300,
330
331 _ => 0,
332 }
333 }
334}
335
336#[cfg(test)]
337mod tests {
338
339 use super::*;
340
341 #[test]
342 fn TestRetryableErrorCodes() {
343 assert!(TransportErrorCode::ConnectionFailed.IsRetryable());
344
345 assert!(TransportErrorCode::Timeout.IsRetryable());
346
347 assert!(TransportErrorCode::RateLimited.IsRetryable());
348
349 assert!(!TransportErrorCode::InvalidRequest.IsRetryable());
350
351 assert!(!TransportErrorCode::NotFound.IsRetryable());
352 }
353
354 #[test]
355 fn TestErrorRecommendedDelays() {
356 assert_eq!(TransportErrorCode::ConnectionFailed.RecommendedRetryDelayMilliseconds(), 1000);
357
358 assert_eq!(TransportErrorCode::Timeout.RecommendedRetryDelayMilliseconds(), 500);
359
360 assert_eq!(TransportErrorCode::RateLimited.RecommendedRetryDelayMilliseconds(), 2000);
361
362 assert_eq!(TransportErrorCode::InvalidRequest.RecommendedRetryDelayMilliseconds(), 0);
363 }
364}