Skip to main content

CommonLibrary/Telemetry/
EmitOTLPSpan.rs

1#![allow(non_snake_case)]
2
3//! Fire-and-forget OTLP span exporter. Lifted from Mountain's
4//! `IPC/DevLog/EmitOTLPSpan` so Air / Echo / Rest / Grove / Mist /
5//! SideCar all share the same raw HTTP path. Single failed POST flips
6//! `OTLP_AVAILABLE` to false so a missing collector doesn't tax every
7//! emit. Release builds compile out via `cfg!(debug_assertions)`.
8
9use std::{
10	collections::hash_map::DefaultHasher,
11	hash::{Hash, Hasher},
12	sync::{
13		OnceLock,
14		atomic::{AtomicBool, Ordering},
15	},
16	time::{SystemTime, UNIX_EPOCH},
17};
18
19use crate::Telemetry::{Client, IsAllowed};
20
21static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
22
23static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
24
25fn NowNano() -> u64 {
26	SystemTime::now()
27		.duration_since(UNIX_EPOCH)
28		.map(|D| D.as_nanos() as u64)
29		.unwrap_or(0)
30}
31
32fn TraceId() -> &'static str {
33	OTLP_TRACE_ID.get_or_init(|| {
34		let mut H = DefaultHasher::new();
35		std::process::id().hash(&mut H);
36		NowNano().hash(&mut H);
37		format!("{:032x}", H.finish() as u128)
38	})
39}
40
41fn RandU64() -> u64 {
42	let mut H = DefaultHasher::new();
43
44	std::thread::current().id().hash(&mut H);
45
46	NowNano().hash(&mut H);
47
48	H.finish()
49}
50
51fn ParseEndpoint(Endpoint:&str) -> (String, String) {
52	let WithoutScheme = Endpoint
53		.strip_prefix("http://")
54		.or_else(|| Endpoint.strip_prefix("https://"))
55		.unwrap_or(Endpoint);
56
57	let (HostPort, Path) = match WithoutScheme.split_once('/') {
58		Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
59
60		None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
61	};
62
63	let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
64
65	(HostPort, PathFinal)
66}
67
68/// Emit one span. `StartNano` / `EndNano` are wall-clock (not monotonic)
69/// nanosecond timestamps - use `NowNano()` from the caller's start.
70pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
71	if !IsAllowed::OTLP() {
72		return;
73	}
74
75	if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
76		return;
77	}
78
79	let Configuration = IsAllowed::Cached();
80
81	let TierStr = Client::TIER.get().map(|T| T.AsStr()).unwrap_or("common");
82
83	let SpanId = format!("{:016x}", RandU64());
84
85	let TraceIdString = TraceId().to_string();
86
87	let SpanName = Name.to_string();
88
89	let AttributesJson:Vec<String> = Attributes
90		.iter()
91		.map(|(K, V)| {
92			format!(
93				r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
94				K,
95				V.replace('\\', "\\\\").replace('"', "\\\"")
96			)
97		})
98		.collect();
99
100	let IsError = SpanName.contains("error");
101
102	let StatusCode = if IsError { 2 } else { 1 };
103
104	let ServiceName = format!("land-editor-{}", TierStr);
105
106	let Payload = format!(
107		concat!(
108			r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
109			r#"{{"key":"service.name","value":{{"stringValue":"{}"}}}},"#,
110			r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}},"#,
111			r#"{{"key":"land.tier","value":{{"stringValue":"{}"}}}}"#,
112			r#"]}},"scopeSpans":[{{"scope":{{"name":"land.{}","version":"1.0.0"}},"#,
113			r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
114			r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
115			r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
116		),
117		ServiceName,
118		TierStr,
119		TierStr,
120		TraceIdString,
121		SpanId,
122		SpanName,
123		StartNano,
124		EndNano,
125		AttributesJson.join(","),
126		StatusCode,
127	);
128
129	let (HostAddress, PathSegment) = ParseEndpoint(&Configuration.OTLPEndpoint);
130
131	std::thread::spawn(move || {
132		use std::{
133			io::{Read as IoRead, Write as IoWrite},
134			net::TcpStream,
135			time::Duration,
136		};
137
138		let Ok(SocketAddress) = HostAddress.parse() else {
139			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
140			return;
141		};
142		let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
143			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
144			return;
145		};
146		let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
147		let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
148
149		let HttpReq = format!(
150			"POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
151			 close\r\n\r\n",
152			PathSegment,
153			HostAddress,
154			Payload.len()
155		);
156		if Stream.write_all(HttpReq.as_bytes()).is_err() {
157			return;
158		}
159		if Stream.write_all(Payload.as_bytes()).is_err() {
160			return;
161		}
162		let mut Buf = [0u8; 32];
163		let _ = Stream.read(&mut Buf);
164		if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
165			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
166		}
167	});
168}
169
170/// Helper exposed to callers that need a span window timestamp.
171pub fn NowNanoPub() -> u64 { NowNano() }