CommonLibrary/Transport/Registry/
mod.rs1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2use std::{
9 collections::{HashMap, HashSet},
10 sync::Arc,
11 time::Duration,
12};
13
14use super::{
15 Common::{DefaultTransportTypeDetector, TransportType, TransportTypeDetector},
16 TransportError::TransportError,
17 TransportStrategy::{TransportMetrics, TransportStrategy as CommonTransportStrategy},
18};
19
20pub struct TransportSelector {
22 #[allow(dead_code)]
24 EnvironmentDetector:Box<dyn TransportTypeDetector + Send + Sync>,
25
26 PriorityOrder:Vec<TransportType>,
28}
29
30impl TransportSelector {
31 pub fn New() -> Self {
33 Self {
34 EnvironmentDetector:Box::new(DefaultTransportTypeDetector),
35
36 PriorityOrder:Self::DefaultPriorityOrder(),
37 }
38 }
39
40 pub fn WithDetector(Detector:Box<dyn TransportTypeDetector + Send + Sync>) -> Self {
42 Self { EnvironmentDetector:Detector, PriorityOrder:Self::DefaultPriorityOrder() }
43 }
44
45 fn DefaultPriorityOrder() -> Vec<TransportType> {
47 let mut Order = Vec::new();
48
49 #[cfg(target_arch = "wasm32")]
50 {
51 Order.push(TransportType::Wasm);
52
53 Order.push(TransportType::Grpc);
54 }
55
56 #[cfg(not(target_arch = "wasm32"))]
57 {
58 Order.push(TransportType::Ipc);
59
60 Order.push(TransportType::Grpc);
61 }
62
63 Order
64 }
65
66 pub fn SelectBest(&self, Context:&TransportContext) -> Result<String, TransportError> {
68 let mut Candidates = Vec::new();
69
70 for TransportKind in self.PriorityOrder.iter() {
71 if !Context.TransportAvailable(*TransportKind) {
72 continue;
73 }
74
75 if !Context.IsAllowed(*TransportKind) {
76 continue;
77 }
78
79 let Score = self.CalculateScore(*TransportKind, Context);
80
81 Candidates.push((*TransportKind, Score));
82 }
83
84 Candidates.sort_by(|Left, Right| Right.1.total_cmp(&Left.1));
85
86 Candidates
87 .first()
88 .map(|(TransportKind, _)| TransportKind.AsString().to_string())
89 .ok_or_else(|| TransportError::NotFound("No suitable transport available for current context"))
90 }
91
92 fn CalculateScore(&self, TransportKind:TransportType, Context:&TransportContext) -> f64 {
94 let mut Score = 0.0;
95
96 if let Some(Position) = self.PriorityOrder.iter().position(|Kind| *Kind == TransportKind) {
97 Score += (self.PriorityOrder.len() - Position) as f64 * 10.0;
98 }
99
100 let Environment = Context.Environment();
101
102 match (Environment.IsWeb, TransportKind) {
103 (true, TransportType::Wasm) => Score += 50.0,
104
105 (false, TransportType::Ipc) => Score += 40.0,
106
107 _ => {},
108 }
109
110 let Requirements = Context.Requirements();
111
112 if Requirements.StreamingRequired {
113 match TransportKind {
114 TransportType::Grpc => Score += 30.0,
115
116 TransportType::Wasm => Score += 20.0,
117
118 TransportType::Ipc => Score -= 20.0,
119
120 TransportType::Unknown => {},
121 }
122 }
123
124 if Requirements.CrossNetwork {
125 match TransportKind {
126 TransportType::Grpc => Score += 50.0,
127
128 TransportType::Ipc => Score -= 50.0,
129
130 TransportType::Wasm => Score += 10.0,
131
132 TransportType::Unknown => {},
133 }
134 }
135
136 Score += match Requirements.Performance {
137 PerformanceLevel::Critical => {
138 match TransportKind {
139 TransportType::Ipc => 40.0,
140
141 TransportType::Grpc => 20.0,
142
143 TransportType::Wasm => 0.0,
144
145 TransportType::Unknown => 0.0,
146 }
147 },
148
149 PerformanceLevel::High => {
150 match TransportKind {
151 TransportType::Ipc => 30.0,
152
153 TransportType::Grpc => 20.0,
154
155 TransportType::Wasm => 10.0,
156
157 TransportType::Unknown => 0.0,
158 }
159 },
160
161 PerformanceLevel::Medium => 10.0,
162
163 PerformanceLevel::Low => 0.0,
164 };
165
166 if let Some(MaximumLatency) = Requirements.MaximumLatencyMilliseconds {
167 let EstimatedLatency = self.EstimateLatencyMilliseconds(TransportKind);
168
169 if EstimatedLatency <= MaximumLatency {
170 Score += 20.0;
171 } else {
172 Score -= 30.0;
173 }
174 }
175
176 Score
177 }
178
179 fn EstimateLatencyMilliseconds(&self, TransportKind:TransportType) -> u64 {
181 match TransportKind {
182 TransportType::Ipc => 1,
183
184 TransportType::Grpc => 5,
185
186 TransportType::Wasm => 20,
187
188 TransportType::Unknown => u64::MAX,
189 }
190 }
191}
192
193impl Default for TransportSelector {
194 fn default() -> Self { Self::New() }
195}
196
197#[derive(Debug, Clone)]
199pub struct TransportContext {
200 EnvironmentInfo:EnvironmentInfo,
201
202 RequirementsInfo:TransportRequirements,
203
204 ConstraintsInfo:TransportConstraints,
205
206 AvailableTransports:HashSet<TransportType>,
207}
208
209impl TransportContext {
210 pub fn New(
212 EnvironmentInfo:EnvironmentInfo,
213
214 RequirementsInfo:TransportRequirements,
215
216 ConstraintsInfo:TransportConstraints,
217 ) -> Self {
218 let AvailableTransports = DefaultTransportTypeDetector::list_available_transports().into_iter().collect();
219
220 Self { EnvironmentInfo, RequirementsInfo, ConstraintsInfo, AvailableTransports }
221 }
222
223 pub fn Detect() -> Self {
226 let EnvironmentInfo = DefaultTransportTypeDetector::DetectEnvironment();
227
228 let RequirementsInfo = TransportRequirements::default();
229
230 let ConstraintsInfo = TransportConstraints::default();
231
232 Self::New(EnvironmentInfo, RequirementsInfo, ConstraintsInfo)
233 }
234
235 pub fn Environment(&self) -> &EnvironmentInfo { &self.EnvironmentInfo }
237
238 pub fn Requirements(&self) -> &TransportRequirements { &self.RequirementsInfo }
240
241 pub fn Constraints(&self) -> &TransportConstraints { &self.ConstraintsInfo }
243
244 pub fn TransportAvailable(&self, TransportKind:TransportType) -> bool {
246 self.AvailableTransports.contains(&TransportKind)
247 }
248
249 pub fn IsAllowed(&self, TransportKind:TransportType) -> bool {
251 if self.ConstraintsInfo.ForbiddenTransports.contains(&TransportKind) {
252 return false;
253 }
254
255 if self.ConstraintsInfo.AllowedTransports.is_empty() {
256 true
257 } else {
258 self.ConstraintsInfo.AllowedTransports.contains(&TransportKind)
259 }
260 }
261
262 pub fn WithAvailableTransports(mut self, Transports:Vec<TransportType>) -> Self {
264 self.AvailableTransports = Transports.into_iter().collect();
265
266 self
267 }
268}
269
270#[derive(Debug, Clone)]
272pub struct EnvironmentInfo {
273 pub Platform:Platform,
275
276 pub IsWeb:bool,
278
279 pub IsDesktop:bool,
281
282 pub BrowserCapabilities:Option<BrowserCapabilities>,
284}
285
286impl EnvironmentInfo {
287 pub fn New(Platform:Platform, IsWeb:bool, IsDesktop:bool, BrowserCapabilities:Option<BrowserCapabilities>) -> Self {
289 Self { Platform, IsWeb, IsDesktop, BrowserCapabilities }
290 }
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
295pub enum Platform {
296 Windows,
297
298 MacOS,
299
300 Linux,
301
302 Browser,
303
304 Mobile,
305
306 Unknown,
307}
308
309impl Platform {
310 pub fn Current() -> Self {
312 #[cfg(target_os = "windows")]
313 return Self::Windows;
314
315 #[cfg(target_os = "macos")]
316 return Self::MacOS;
317
318 #[cfg(target_os = "linux")]
319 return Self::Linux;
320
321 #[cfg(all(not(target_os = "windows"), not(target_os = "macos"), not(target_os = "linux")))]
322 return Self::Unknown;
323 }
324}
325
326#[derive(Debug, Clone)]
328pub struct BrowserCapabilities {
329 pub WasmSupported:bool,
330
331 pub WebWorkerSupported:bool,
332
333 pub WebSocketSupported:bool,
334
335 pub SharedArrayBufferSupported:bool,
336}
337
338impl Default for BrowserCapabilities {
339 fn default() -> Self {
340 Self {
341 WasmSupported:cfg!(target_arch = "wasm32"),
342
343 WebWorkerSupported:false,
344
345 WebSocketSupported:false,
346
347 SharedArrayBufferSupported:false,
348 }
349 }
350}
351
352#[derive(Debug, Clone)]
354pub struct TransportRequirements {
355 pub StreamingRequired:bool,
357
358 pub CrossProcess:bool,
360
361 pub CrossNetwork:bool,
363
364 pub Performance:PerformanceLevel,
366
367 pub Reliability:ReliabilityLevel,
369
370 pub MaximumLatencyMilliseconds:Option<u64>,
372}
373
374impl Default for TransportRequirements {
375 fn default() -> Self {
376 Self {
377 StreamingRequired:false,
378
379 CrossProcess:false,
380
381 CrossNetwork:false,
382
383 Performance:PerformanceLevel::Medium,
384
385 Reliability:ReliabilityLevel::Medium,
386
387 MaximumLatencyMilliseconds:None,
388 }
389 }
390}
391
392#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
394pub enum PerformanceLevel {
395 Low = 1,
396
397 Medium = 2,
398
399 High = 3,
400
401 Critical = 4,
402}
403
404#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
406pub enum ReliabilityLevel {
407 Low = 1,
408
409 Medium = 2,
410
411 High = 3,
412
413 Critical = 4,
414}
415
416#[derive(Debug, Clone)]
418pub struct TransportConstraints {
419 pub AllowedTransports:Vec<TransportType>,
421
422 pub ForbiddenTransports:Vec<TransportType>,
424
425 pub MaximumLatencyMilliseconds:Option<u64>,
427
428 pub MaximumBandwidthBytesPerSecond:Option<u64>,
430}
431
432impl Default for TransportConstraints {
433 fn default() -> Self {
434 Self {
435 AllowedTransports:Vec::new(),
436
437 ForbiddenTransports:Vec::new(),
438
439 MaximumLatencyMilliseconds:None,
440
441 MaximumBandwidthBytesPerSecond:None,
442 }
443 }
444}
445
446pub struct TransportRegistry {
448 Transports:HashMap<String, Arc<dyn CommonTransportStrategy>>,
450
451 Active:Option<String>,
453
454 Selector:TransportSelector,
456}
457
458impl TransportRegistry {
459 pub fn New() -> Self { Self { Transports:HashMap::new(), Active:None, Selector:TransportSelector::New() } }
461
462 pub fn WithSelector(Selector:TransportSelector) -> Self {
464 Self { Transports:HashMap::new(), Active:None, Selector }
465 }
466
467 pub fn Register(&mut self, Name:String, Transport:Arc<dyn CommonTransportStrategy>) {
469 log::info!("Registering transport: {}", Name);
470
471 self.Transports.insert(Name, Transport);
472 }
473
474 pub async fn Unregister(&mut self, Name:&str) -> Result<(), TransportError> {
476 let TransportOption = self.Transports.remove(Name);
477
478 if let Some(_Transport) = TransportOption {
479 log::info!("Unregistered transport: {}", Name);
482
483 Ok(())
484 } else {
485 Err(TransportError::NotFound(format!("Transport '{}' not found", Name)))
486 }
487 }
488
489 pub async fn Select(&mut self, Name:&str) -> Result<(), TransportError> {
491 if !self.Transports.contains_key(Name) {
492 return Err(TransportError::NotFound(format!("Transport '{}' not found", Name)));
493 }
494
495 log::info!("Selecting transport: {}", Name);
496
497 self.Active = Some(Name.to_string());
498
499 Ok(())
500 }
501
502 pub async fn AutoSelect(&mut self, Context:&TransportContext) -> Result<String, TransportError> {
504 let SelectedName = self.Selector.SelectBest(Context)?;
505
506 self.Select(&SelectedName).await?;
507
508 Ok(SelectedName)
509 }
510
511 pub fn GetActive(&self) -> Option<Arc<dyn CommonTransportStrategy>> {
513 self.Active.as_ref().and_then(|Name| self.Transports.get(Name)).cloned()
514 }
515
516 pub fn Get(&self, Name:&str) -> Option<Arc<dyn CommonTransportStrategy>> { self.Transports.get(Name).cloned() }
518
519 pub fn List(&self) -> Vec<String> { self.Transports.keys().cloned().collect() }
521
522 pub fn Has(&self, Name:&str) -> bool { self.Transports.contains_key(Name) }
524
525 pub fn GetAllMetrics(&self) -> HashMap<String, TransportMetrics> {
527 let mut Metrics = HashMap::new();
528
529 for (Name, Transport) in &self.Transports {
530 Metrics.insert(Name.clone(), Transport.Metrics());
531 }
532
533 Metrics
534 }
535
536 pub fn GetHealthStatus(&self) -> HashMap<String, bool> {
538 let mut Status = HashMap::new();
539
540 for (Name, Transport) in &self.Transports {
541 Status.insert(Name.clone(), Transport.IsConnected());
542 }
543
544 Status
545 }
546
547 pub fn ActiveName(&self) -> Option<&str> { self.Active.as_deref() }
549
550 pub fn SetSelector(&mut self, Selector:TransportSelector) { self.Selector = Selector; }
552
553 pub async fn WaitForReady(&self, Name:&str, Timeout:Duration) -> Result<(), TransportError> {
555 use tokio::time::Instant;
556
557 let Start = Instant::now();
558
559 let Transport = self
560 .Get(Name)
561 .ok_or_else(|| TransportError::NotFound(format!("Transport '{}' not found", Name)))?;
562
563 loop {
564 if Transport.IsConnected() {
565 return Ok(());
566 }
567
568 if Start.elapsed() >= Timeout {
569 return Err(TransportError::Timeout("Transport did not become ready within timeout"));
570 }
571
572 tokio::time::sleep(Duration::from_millis(50)).await;
573 }
574 }
575}
576
577impl Default for TransportRegistry {
578 fn default() -> Self { Self::New() }
579}
580
581impl DefaultTransportTypeDetector {
583 pub fn DetectEnvironment() -> EnvironmentInfo {
585 let CurrentPlatform = Platform::Current();
586
587 let IsDesktop = !cfg!(target_arch = "wasm32");
588
589 let Environment =
590 EnvironmentInfo { Platform:CurrentPlatform, IsWeb:false, IsDesktop, BrowserCapabilities:None };
591
592 #[cfg(target_arch = "wasm32")]
593 {
594 EnvironmentInfo {
595 IsWeb:true,
596
597 IsDesktop:false,
598
599 BrowserCapabilities:Some(BrowserCapabilities::default()),
600 ..Environment
601 }
602 }
603
604 #[cfg(not(target_arch = "wasm32"))]
605 {
606 Environment
607 }
608 }
609}
610
611#[cfg(test)]
612mod tests {
613
614 use async_trait::async_trait;
615
616 use super::{
617 super::{
618 TransportConfig::TransportConfig,
619 TransportError::TransportError,
620 TransportStrategy::{TransportCapabilities, TransportMetrics},
621 UnifiedRequest::UnifiedRequest,
622 UnifiedResponse::UnifiedResponse,
623 },
624 *,
625 };
626
627 #[test]
628 fn TestTransportSelectorCreation() {
629 let Selector = TransportSelector::New();
630
631 assert!(!Selector.PriorityOrder.is_empty());
632 }
633
634 #[test]
635 fn TestTransportContextCreation() {
636 let Environment = EnvironmentInfo::New(Platform::Linux, false, true, None);
637
638 let Requirements = TransportRequirements::default();
639
640 let Constraints = TransportConstraints::default();
641
642 let Context = TransportContext::New(Environment, Requirements, Constraints);
643
644 assert!(Context.TransportAvailable(TransportType::Grpc));
647 }
648
649 #[test]
650 fn TestTransportRegistryCreation() {
651 let Registry = TransportRegistry::New();
652
653 assert!(Registry.List().is_empty());
654
655 assert!(Registry.ActiveName().is_none());
656 }
657
658 #[tokio::test]
659 async fn TestRegistryRegisterUnregister() {
660 let mut Registry = TransportRegistry::New();
661
662 let MockTransportInstance = Arc::new(MockTransport::New());
663
664 Registry.Register("mock".to_string(), MockTransportInstance);
665
666 assert!(Registry.Has("mock"));
667
668 assert_eq!(Registry.List().len(), 1);
669
670 Registry.Unregister("mock").await.unwrap();
671
672 assert!(!Registry.Has("mock"));
673 }
674
675 #[derive(Debug, Clone)]
677 struct MockTransport;
678
679 impl MockTransport {
680 fn New() -> Self { Self }
681 }
682
683 #[async_trait]
684 impl CommonTransportStrategy for MockTransport {
685 async fn Connect(&mut self) -> Result<(), TransportError> { Ok(()) }
686
687 async fn Disconnect(&mut self) -> Result<(), TransportError> { Ok(()) }
688
689 async fn SendRequest(&mut self, Request:UnifiedRequest) -> Result<UnifiedResponse, TransportError> {
690 Ok(UnifiedResponse::Success(
691 Request.CorrelationIdentifier.clone().unwrap_or_default(),
692 Vec::new(),
693 ))
694 }
695
696 async fn SendNotification(&mut self, _Notification:UnifiedRequest) -> Result<(), TransportError> { Ok(()) }
697
698 fn StreamEvents(
699 &self,
700 ) -> std::result::Result<futures::stream::BoxStream<'static, UnifiedResponse>, TransportError> {
701 Err(TransportError::NotSupported("Streaming not supported"))
702 }
703
704 fn IsConnected(&self) -> bool { true }
705
706 fn LatencyMilliseconds(&self) -> u64 { 0 }
707
708 fn TransportKind(&self) -> TransportType { TransportType::Grpc }
709
710 fn Configuration(&self) -> &TransportConfig {
711 static CONFIG:std::sync::OnceLock<TransportConfig> = std::sync::OnceLock::new();
712
713 CONFIG.get_or_init(TransportConfig::default)
714 }
715
716 fn Capabilities(&self) -> TransportCapabilities { TransportCapabilities::default() }
717
718 fn Metrics(&self) -> TransportMetrics { TransportMetrics::New() }
719
720 fn SupportsStreaming(&self) -> bool { false }
721 }
722}