diff --git a/cli/Cargo.lock b/cli/Cargo.lock index 0fea51d..7c843ec 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -361,8 +361,10 @@ dependencies = [ "opentelemetry", "opentelemetry-appender-tracing", "opentelemetry-otlp", + "opentelemetry-semantic-conventions", "opentelemetry-stdout", "opentelemetry_sdk", + "tokio", "tracing", "tracing-subscriber", ] @@ -992,16 +994,6 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" -[[package]] -name = "iri-string" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1266,9 +1258,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "opentelemetry" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +checksum = "b0142c63252a9e054e68a4c61a5778f7b14f576274d593f8ce883d191a099682" dependencies = [ "futures-core", "futures-sink", @@ -1280,9 +1272,9 @@ dependencies = [ [[package]] name = "opentelemetry-appender-tracing" -version = "0.31.1" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef6a1ac5ca3accf562b8c306fa8483c85f4390f768185ab775f242f7fe8fdcc2" +checksum = "2c0080f0dc1d7c786f467cd85a4e395fcab11ee852004f39a29a18ab7c25d837" dependencies = [ "opentelemetry", "tracing", @@ -1292,9 +1284,9 @@ dependencies = [ [[package]] name = "opentelemetry-http" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +checksum = "5683015d09e2df236ef005b17f6f196f0d5f6313c4fa43a7b6a53b52776e4331" dependencies = [ "async-trait", "bytes", @@ -1305,9 +1297,9 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" +checksum = "9966929966d17620d7c316c643ba62631826e10021409357772d5eea84f62c35" dependencies = [ "http", "opentelemetry", @@ -1319,14 +1311,14 @@ dependencies = [ "thiserror 2.0.16", "tokio", "tonic", - "tracing", + "tonic-types", ] [[package]] name = "opentelemetry-proto" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +checksum = "56d658ba1faf63f7b9c492cfbe6e0ec365440a16132d3270c1065f7b33f1b638" dependencies = [ "opentelemetry", "opentelemetry_sdk", @@ -1335,11 +1327,17 @@ dependencies = [ "tonic-prost", ] +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ca2f98a0437b427b4b08f19f1caa3c44db885a202bc12cfea13d6c702243d68" + [[package]] name = "opentelemetry-stdout" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc8887887e169414f637b18751487cce4e095be787d23fad13c454e2fb1b3811" +checksum = "a1b1c6a247d79091f0062a5f4bd058589525cf987a8d4c169440d9c1be72f0ad" dependencies = [ "chrono", "opentelemetry", @@ -1348,15 +1346,16 @@ dependencies = [ [[package]] name = "opentelemetry_sdk" -version = "0.31.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +checksum = "9b59f80e1ac4d5ff7a2db8fb6c80badb7f0f3f858211fba08dd9aaec750894f9" dependencies = [ "futures-channel", "futures-executor", "futures-util", "opentelemetry", "percent-encoding", + "portable-atomic", "rand", "thiserror 2.0.16", "tokio", @@ -1502,6 +1501,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "potential_utf" version = "0.1.4" @@ -1708,9 +1713,9 @@ checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" [[package]] name = "reqwest" -version = "0.12.24" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" +checksum = "219c5811de6525e5416c7d5d53bb656d3afdbc6c5af816e0802bcfa42dbdc1c3" dependencies = [ "base64", "bytes", @@ -1726,9 +1731,6 @@ dependencies = [ "log", "percent-encoding", "pin-project-lite", - "serde", - "serde_json", - "serde_urlencoded", "sync_wrapper", "tokio", "tower", @@ -1926,18 +1928,6 @@ dependencies = [ "serde_core", ] -[[package]] -name = "serde_urlencoded" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" -dependencies = [ - "form_urlencoded", - "itoa", - "ryu", - "serde", -] - [[package]] name = "serde_yaml" version = "0.9.34+deprecated" @@ -2273,6 +2263,17 @@ dependencies = [ "tonic-prost", ] +[[package]] +name = "tonic-types" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a875a902255423d34c1f20838ab374126db8eb41625b7947a1d54113b0b7399" +dependencies = [ + "prost", + "prost-types", + "tonic", +] + [[package]] name = "tower" version = "0.5.2" @@ -2294,9 +2295,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.6" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" +checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ "base64", "bitflags", @@ -2304,13 +2305,13 @@ dependencies = [ "futures-util", "http", "http-body", - "iri-string", "mime", "pin-project-lite", "tower", "tower-layer", "tower-service", "tracing", + "url", ] [[package]] diff --git a/core/Cargo.lock b/core/Cargo.lock index 745a66d..6ae4f98 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -415,8 +415,10 @@ dependencies = [ "opentelemetry", "opentelemetry-appender-tracing", "opentelemetry-otlp", + "opentelemetry-semantic-conventions", "opentelemetry-stdout", "opentelemetry_sdk", + "tokio", "tracing", "tracing-subscriber", ] @@ -1034,16 +1036,6 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" -[[package]] -name = "iri-string" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "itertools" version = "0.14.0" @@ -1239,6 +1231,9 @@ dependencies = [ "cortexbrain-common", "libc", "nix", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "tokio", "tracing", "tracing-subscriber", @@ -1355,9 +1350,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "opentelemetry" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +checksum = "b0142c63252a9e054e68a4c61a5778f7b14f576274d593f8ce883d191a099682" dependencies = [ "futures-core", "futures-sink", @@ -1369,9 +1364,9 @@ dependencies = [ [[package]] name = "opentelemetry-appender-tracing" -version = "0.31.1" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef6a1ac5ca3accf562b8c306fa8483c85f4390f768185ab775f242f7fe8fdcc2" +checksum = "2c0080f0dc1d7c786f467cd85a4e395fcab11ee852004f39a29a18ab7c25d837" dependencies = [ "opentelemetry", "tracing", @@ -1381,9 +1376,9 @@ dependencies = [ [[package]] name = "opentelemetry-http" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +checksum = "5683015d09e2df236ef005b17f6f196f0d5f6313c4fa43a7b6a53b52776e4331" dependencies = [ "async-trait", "bytes", @@ -1394,9 +1389,9 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" +checksum = "9966929966d17620d7c316c643ba62631826e10021409357772d5eea84f62c35" dependencies = [ "http", "opentelemetry", @@ -1408,14 +1403,14 @@ dependencies = [ "thiserror 2.0.17", "tokio", "tonic", - "tracing", + "tonic-types", ] [[package]] name = "opentelemetry-proto" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +checksum = "56d658ba1faf63f7b9c492cfbe6e0ec365440a16132d3270c1065f7b33f1b638" dependencies = [ "opentelemetry", "opentelemetry_sdk", @@ -1424,11 +1419,17 @@ dependencies = [ "tonic-prost", ] +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ca2f98a0437b427b4b08f19f1caa3c44db885a202bc12cfea13d6c702243d68" + [[package]] name = "opentelemetry-stdout" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc8887887e169414f637b18751487cce4e095be787d23fad13c454e2fb1b3811" +checksum = "a1b1c6a247d79091f0062a5f4bd058589525cf987a8d4c169440d9c1be72f0ad" dependencies = [ "chrono", "opentelemetry", @@ -1437,15 +1438,16 @@ dependencies = [ [[package]] name = "opentelemetry_sdk" -version = "0.31.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +checksum = "9b59f80e1ac4d5ff7a2db8fb6c80badb7f0f3f858211fba08dd9aaec750894f9" dependencies = [ "futures-channel", "futures-executor", "futures-util", "opentelemetry", "percent-encoding", + "portable-atomic", "rand", "thiserror 2.0.17", "tokio", @@ -1585,6 +1587,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "potential_utf" version = "0.1.4" @@ -1792,9 +1800,9 @@ checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" [[package]] name = "reqwest" -version = "0.12.24" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" +checksum = "219c5811de6525e5416c7d5d53bb656d3afdbc6c5af816e0802bcfa42dbdc1c3" dependencies = [ "base64", "bytes", @@ -1810,9 +1818,6 @@ dependencies = [ "log", "percent-encoding", "pin-project-lite", - "serde", - "serde_json", - "serde_urlencoded", "sync_wrapper", "tokio", "tower", @@ -2020,18 +2025,6 @@ dependencies = [ "serde_core", ] -[[package]] -name = "serde_urlencoded" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" -dependencies = [ - "form_urlencoded", - "itoa", - "ryu", - "serde", -] - [[package]] name = "serde_yaml" version = "0.9.34+deprecated" @@ -2219,9 +2212,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.48.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" dependencies = [ "bytes", "libc", @@ -2361,6 +2354,17 @@ dependencies = [ "tonic-prost", ] +[[package]] +name = "tonic-types" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a875a902255423d34c1f20838ab374126db8eb41625b7947a1d54113b0b7399" +dependencies = [ + "prost", + "prost-types", + "tonic", +] + [[package]] name = "tower" version = "0.5.2" @@ -2382,9 +2386,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.6" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" +checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ "base64", "bitflags", @@ -2392,13 +2396,13 @@ dependencies = [ "futures-util", "http", "http-body", - "iri-string", "mime", "pin-project-lite", "tower", "tower-layer", "tower-service", "tracing", + "url", ] [[package]] diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index ee50e2b..e1c39c5 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -16,15 +16,16 @@ anyhow = "1.0" kube = { version = "2.0.1", features = ["client"] } k8s-openapi = { version = "0.26.0", features = ["v1_34"] } aya = "0.13.1" -opentelemetry = "0.31.0" -opentelemetry_sdk = { version = "0.31.0", features = ["logs", "rt-tokio"] } -opentelemetry-stdout = { version = "0.31.0", features = ["logs"] } -opentelemetry-appender-tracing = "0.31.1" -opentelemetry-otlp = { version = "0.31.0", features = ["logs", "grpc-tonic"] } +opentelemetry = "0.32.0" +opentelemetry_sdk = { version = "0.32.0", features = ["logs", "rt-tokio"] } +opentelemetry-stdout = { version = "0.32.0", features = ["logs"] } +opentelemetry-appender-tracing = "0.32.0" +opentelemetry-otlp = { version = "0.32.0", features = ["logs", "grpc-tonic"] } bytemuck = "1.25.0" bytes = "1.11.0" bytemuck_derive = "1.10.2" tokio = "1.49.0" +opentelemetry-semantic-conventions = "0.32.0" [features] map-handlers = [] diff --git a/core/common/src/buffer_type.rs b/core/common/src/buffer_type.rs index f962698..45d82c8 100644 --- a/core/common/src/buffer_type.rs +++ b/core/common/src/buffer_type.rs @@ -1,9 +1,14 @@ +#[cfg(feature = "monitoring-structs")] +use crate::otel_metrics::Metrics; #[cfg(feature = "buffer-reader")] use aya::maps::{MapData, PerfEventArray}; use aya::{maps::perf::PerfEventArrayBuffer, util::online_cpus}; use bytemuck_derive::Zeroable; use bytes::BytesMut; use std::net::Ipv4Addr; +#[cfg(feature = "buffer-reader")] +#[cfg(feature = "monitoring-structs")] +use std::sync::Arc; use tracing::{error, info, warn}; // @@ -342,7 +347,39 @@ impl BufferType { } } #[cfg(feature = "monitoring-structs")] - pub async fn read_network_metrics(buffers: &mut [BytesMut], tot_events: i32, offset: i32) { + /// Continuously read [`NetworkMetrics`] events and record OpenTelemetry + /// observations. + /// + /// This helper mirrors the core behaviour of + /// [`cortexbrain_common::buffer_type::read_perf_buffer`] but adds the OTel + /// instrumentation layer. + /// + /// # Loop + /// + /// 1. For every CPU buffer call `read_events`. + /// 2. Parse each raw [`BytesMut`] into [`NetworkMetrics`] using an + /// unaligned read (the struct is `#[repr(C, packed)]` and `Pod`). + /// 3. Call [`Metrics::record_network_metrics`]. + /// 4. Retain the legacy `tracing::info!` log for human-readable local output. + /// 5. Sleep 100 ms between polls. + /// + /// # Safety + /// + /// `std::ptr::read_unaligned` is safe here because the eBPF program writes + /// exactly the `NetworkMetrics` layout into the ring buffer and the struct + /// implements [`aya::Pod`]. + /// Continuously read [`TimeStampMetrics`] events and record OpenTelemetry + /// observations. + /// + /// Counterpart to [`read_network_buffer`] for the `time_stamp_events` map. + + pub async fn read_network_metrics( + buffers: &mut [BytesMut], + tot_events: i32, + offset: i32, + exporter: &str, + metrics: Arc, + ) { for i in offset..tot_events { let vec_bytes = &buffers[i as usize]; if vec_bytes.len() < std::mem::size_of::() { @@ -361,6 +398,11 @@ impl BufferType { if vec_bytes.len() >= std::mem::size_of::() { let net_metrics: NetworkMetrics = unsafe { std::ptr::read_unaligned(vec_bytes.as_ptr() as *const _) }; + + match exporter { + "otlp" => metrics.record_network_metrics(&net_metrics), + _ => continue, // skip + } let tgid = net_metrics.tgid; let comm = String::from_utf8_lossy(&net_metrics.comm); let ts_us = net_metrics.ts_us; @@ -389,7 +431,13 @@ impl BufferType { } } #[cfg(feature = "monitoring-structs")] - pub async fn read_timestamp_metrics(buffers: &mut [BytesMut], tot_events: i32, offset: i32) { + pub async fn read_timestamp_metrics( + buffers: &mut [BytesMut], + tot_events: i32, + offset: i32, + exporter: &str, + metrics: Arc, + ) { for i in offset..tot_events { let vec_bytes = &buffers[i as usize]; if vec_bytes.len() < std::mem::size_of::() { @@ -408,6 +456,12 @@ impl BufferType { if vec_bytes.len() >= std::mem::size_of::() { let time_stamp_event: TimeStampMetrics = unsafe { std::ptr::read_unaligned(vec_bytes.as_ptr() as *const _) }; + + match exporter { + "otlp" => metrics.record_timestamp_metrics(&time_stamp_event), + _ => continue, + } + let delta_us = time_stamp_event.delta_us; let ts_us = time_stamp_event.ts_us; let tgid = time_stamp_event.tgid; @@ -431,6 +485,7 @@ pub async fn read_perf_buffer>( mut array_buffers: Vec>, mut buffers: Vec, buffer_type: BufferType, + #[cfg(feature = "monitoring-structs")] metrics: Option>, ) { // loop over the buffers loop { @@ -469,13 +524,29 @@ pub async fn read_perf_buffer>( } #[cfg(feature = "monitoring-structs")] BufferType::NetworkMetrics => { - BufferType::read_network_metrics(&mut buffers, tot_events, offset) - .await + BufferType::read_network_metrics( + &mut buffers, + tot_events, + offset, + "otlp", + metrics + .clone() + .expect("Metrics required for NetworkMetrics"), + ) + .await } #[cfg(feature = "monitoring-structs")] BufferType::TimeStampMetrics => { - BufferType::read_timestamp_metrics(&mut buffers, tot_events, offset) - .await + BufferType::read_timestamp_metrics( + &mut buffers, + tot_events, + offset, + "otlp", + metrics + .clone() + .expect("Metric required for TimeStampMetrics"), + ) + .await } } } diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index d7e48b0..15c4ad7 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -1,7 +1,7 @@ #[cfg(any( feature = "buffer-reader", feature = "network-structs", - feature = "monitoring-structs" + feature = "monitoring-structs", ))] pub mod buffer_type; pub mod constants; @@ -9,5 +9,7 @@ pub mod formatters; pub mod logger; #[cfg(feature = "map-handlers")] pub mod map_handlers; +#[cfg(feature = "monitoring-structs")] +pub mod otel_metrics; #[cfg(feature = "program-handlers")] pub mod program_handlers; diff --git a/core/common/src/otel_metrics.rs b/core/common/src/otel_metrics.rs new file mode 100644 index 0000000..ae8c9db --- /dev/null +++ b/core/common/src/otel_metrics.rs @@ -0,0 +1,133 @@ +//! OpenTelemetry metric instruments for eBPF perf-buffer events. +//! +//! This module centralises every [`Meter`]-backed instrument that the +//! `metrics` crate uses to observe raw eBPF events. It provides a single +//! [`Metrics`] handle that is cheap to [`Arc`]-clone and safe to use from +//! multiple asynchronous tasks concurrently. +//! +//! - An [`Arc`] is moved into each Tokio +//! task that reads a perf buffer. All instrument operations are lock-free. +//! - Every observation is tagged with `tgid` and `comm` +//! extracted from the eBPF struct, allowing downstream collectors to group +//! telemetry by process. + +use crate::buffer_type::{NetworkMetrics, TimeStampMetrics}; +use opentelemetry::KeyValue; +use opentelemetry::metrics::{Counter, Gauge, Histogram, Meter}; +pub struct Metrics { + /// Total number of eBPF events processed across all perf buffers. + pub events_total: Counter, + + /// Total number of network-related events produced by the `net_metrics` + /// eBPF map. + pub packets_total: Counter, + + /// Observed socket drop count (`sk_drops`) from the kernel sock struct. + pub sk_drops: Gauge, + + /// Observed socket error count (`sk_err`) from the kernel sock struct. + pub sk_err: Gauge, + + /// Histogram of `delta_us` values supplied by the `time_stamp_events` + /// perf buffer. + pub delta_us: Histogram, + + /// Histogram of `ts_us` values seen in both `net_metrics` and + /// `time_stamp_events`. + pub ts_us: Histogram, +} + +impl Metrics { + /// Initialise all instruments backed by the supplied [`Meter`]. + pub fn new(meter: &Meter) -> Self { + // total events + let events_total = meter + .u64_counter("cortexbrain_events_total") + .with_description("Total number of eBPF events processed") + .build(); + + // total packets + let packets_total = meter + .u64_counter("cortexbrain_packets_total") + .with_description("Total number of network events processed") + .build(); + + // socket drops + let sk_drops = meter + .i64_gauge("cortexbrain_sk_drops") + .with_description("Socket drop count per event") + .build(); + + // socket errors + let sk_err = meter + .i64_gauge("cortexbrain_sk_err") + .with_description("Socket error count per event") + .build(); + + // delta microseconds + let delta_us = meter + .u64_histogram("cortexbrain_delta_us") + .with_description("Distribution of delta_us values from timestamp events") + .build(); + + // timestamp microseconds grouped + let ts_us = meter + .u64_histogram("cortexbrain_ts_us") + .with_description("Distribution of timestamp values from eBPF events") + .build(); + + Self { + events_total, + packets_total, + sk_drops, + sk_err, + delta_us, + ts_us, + } + } + + /// Record a single [`NetworkMetrics`] event. + /// + /// Increments `events_total` and `packets_total`, records `sk_drops` and + /// `sk_err` as gauges, and observes `ts_us` in the timestamp histogram. + /// + /// Every observation carries: + /// + /// -`tgid` – task group ID. + /// - `comm` – command name (null-terminated bytes converted to a UTF-8 + /// string and trimmed). + pub fn record_network_metrics(&self, m: &NetworkMetrics) { + let comm = String::from_utf8_lossy(&m.comm); + let comm_trimmed = comm.trim_end_matches('\0').to_string(); + let attrs = &[ + KeyValue::new("tgid", m.tgid as i64), + KeyValue::new("comm", comm_trimmed), + ]; + + self.events_total.add(1, attrs); + self.packets_total.add(1, attrs); + self.sk_drops.record(m.sk_drops as i64, attrs); + self.sk_err.record(m.sk_err as i64, attrs); + self.ts_us.record(m.ts_us, attrs); + } + + /// Record a single [`TimeStampMetrics`] event. + /// + /// Increments `events_total`, and records `delta_us` and `ts_us` in their + /// respective histograms. + /// + /// Every observation carries `tgid` and `comm` (see + /// [`record_network_metrics`]). + pub fn record_timestamp_metrics(&self, m: &TimeStampMetrics) { + let comm = String::from_utf8_lossy(&m.comm); + let comm_trimmed = comm.trim_end_matches('\0').to_string(); + let attrs = &[ + KeyValue::new("tgid", m.tgid as i64), + KeyValue::new("comm", comm_trimmed), + ]; + + self.events_total.add(1, attrs); + self.delta_us.record(m.delta_us, attrs); + self.ts_us.record(m.ts_us, attrs); + } +} diff --git a/core/src/components/metrics/Cargo.toml b/core/src/components/metrics/Cargo.toml index c8dcb5b..1c7d420 100644 --- a/core/src/components/metrics/Cargo.toml +++ b/core/src/components/metrics/Cargo.toml @@ -28,3 +28,6 @@ cortexbrain-common = { path = "../../../common/", features = [ "network-structs" ] } nix = { version = "0.30.1", features = ["net"] } +opentelemetry = "0.32.0" +opentelemetry_sdk = "0.32.0" +opentelemetry-otlp = { version = "0.32.0", features = ["grpc-tonic"] } diff --git a/core/src/components/metrics/src/helpers.rs b/core/src/components/metrics/src/helpers.rs index 843f45d..804e930 100644 --- a/core/src/components/metrics/src/helpers.rs +++ b/core/src/components/metrics/src/helpers.rs @@ -1,14 +1,34 @@ use anyhow::anyhow; use aya::util::online_cpus; use cortexbrain_common::map_handlers::map_manager; -use cortexbrain_common::{ - buffer_type::{BufferSize, BufferType, read_perf_buffer}, - map_handlers::BpfMapsData, -}; +use cortexbrain_common::{buffer_type::BufferSize, map_handlers::BpfMapsData}; +use opentelemetry::metrics::Meter; +use std::sync::Arc; use tokio::signal; use tracing::{error, info}; -pub async fn event_listener(bpf_maps: BpfMapsData) -> Result<(), anyhow::Error> { +use cortexbrain_common::buffer_type::{BufferType, read_perf_buffer}; +use cortexbrain_common::otel_metrics::Metrics; + +/// Listen for eBPF perf-buffer events and record OpenTelemetry metrics. +/// +/// This function bridges the eBPF perf-buffer layer with the OpenTelemetry +/// metrics pipeline. It opens per-CPU buffers for the two maps of interest +/// (`net_metrics` and `time_stamp_events`), spawns asynchronous consumers, +/// and parks until a `Ctrl-C` signal is received or one of the consumers +/// terminates. +/// +/// # Arguments +/// +/// -`bpf_maps` – handles for the pinned BPF maps produced by +/// [`cortexbrain_common::map_handlers::map_pinner`]. +/// - `meter` – an initialised OpenTelemetry [`Meter`]. +/// +/// # Errors +/// +/// Returns `Err` if the map manager or CPU enumeration fails. +/// +pub async fn event_listener(bpf_maps: BpfMapsData, meter: Meter) -> Result<(), anyhow::Error> { info!("Getting CPU count..."); let mut maps = map_manager(bpf_maps)?; @@ -35,48 +55,63 @@ pub async fn event_listener(bpf_maps: BpfMapsData) -> Result<(), anyhow::Error> info!("Perf buffers created successfully"); - let (time_stamp_events_array, time_stamp_events_perf_buffer) = maps + let (_time_stamp_events_array, time_stamp_events_perf_buffer) = maps .remove("time_stamp_events") .expect("Cannot create time_stamp_events_buffer"); - let (net_perf_array, net_perf_buffer) = maps + let (_net_perf_array, net_perf_buffer) = maps .remove("net_metrics") .expect("Cannot create net_perf_buffer"); - // Create proper sized buffers + // Allocate byte-buffers sized for each structure type let net_metrics_buffers = BufferSize::NetworkMetricsEvents.set_buffer(); let time_stamp_events_buffers = BufferSize::TimeMetricsEvents.set_buffer(); + let metrics = Arc::new(Metrics::new(&meter)); + info!("Starting event listener tasks..."); - let metrics_map_displayer = tokio::spawn(async move { - read_perf_buffer( - net_perf_buffer, - net_metrics_buffers, - BufferType::NetworkMetrics, - ) - .await; - }); - - let time_stamp_events_displayer = tokio::spawn(async move { - read_perf_buffer( - time_stamp_events_perf_buffer, - time_stamp_events_buffers, - BufferType::TimeStampMetrics, - ) - .await; - }); + + let net_metrics_handle = { + let metrics = Arc::clone(&metrics); + let mut array_buffers = net_perf_buffer; + let mut buffers = net_metrics_buffers; + tokio::spawn(async move { + read_perf_buffer( + array_buffers, + buffers, + BufferType::NetworkMetrics, + Some(metrics), + ) + .await; + }) + }; + + let time_stamp_handle = { + let metrics = Arc::clone(&metrics); + let mut array_buffers = time_stamp_events_perf_buffer; + let mut buffers = time_stamp_events_buffers; + tokio::spawn(async move { + read_perf_buffer( + array_buffers, + buffers, + BufferType::TimeStampMetrics, + Some(metrics), + ) + .await; + }) + }; info!("Event listeners started, entering main loop..."); tokio::select! { - result = metrics_map_displayer => { + result = net_metrics_handle => { if let Err(e) = result { - error!("Metrics map displayer task failed: {:?}", e); + error!("Network metrics task failed: {:?}", e); } } - result = time_stamp_events_displayer => { + result = time_stamp_handle => { if let Err(e) = result { - error!("Time stamp events displayer task failed: {:?}", e); + error!("Timestamp events task failed: {:?}", e); } } @@ -85,6 +120,5 @@ pub async fn event_listener(bpf_maps: BpfMapsData) -> Result<(), anyhow::Error> } } - // return success Ok(()) } diff --git a/core/src/components/metrics/src/main.rs b/core/src/components/metrics/src/main.rs index e5558eb..0211be6 100644 --- a/core/src/components/metrics/src/main.rs +++ b/core/src/components/metrics/src/main.rs @@ -1,4 +1,15 @@ -use anyhow::{Context, Ok}; +//! CortexBrain metrics service – eBPF-based telemetry with OpenTelemetry export. +//! +//! This binary is the node-level metrics agent for CortexBrain. It: +//! +//! 1. Initialises an OpenTelemetry metrics pipeline (OTLP / gRPC). +//! 2. Loads a compiled eBPF object and pins its maps to the BPF filesystem. +//! 3. Attaches a set of kernel kprobe programs. +//! 4. Starts asynchronous consumers that read per-CPU perf buffers and +//! emit OpenTelemetry instruments for every event. +//! 5. Blocks until `Ctrl-C` is received, then shuts down cleanly. + +use anyhow::Context; use aya::Ebpf; use std::{ env, fs, @@ -6,9 +17,10 @@ use std::{ sync::{Arc, Mutex}, }; use tracing::{error, info}; - mod helpers; +mod otel_init; use crate::helpers::event_listener; +use crate::otel_init::{init_opentelemetry, shutdown_opentelemetry}; use cortexbrain_common::{ constants, @@ -19,12 +31,14 @@ use cortexbrain_common::{ #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - //init tracing subscriber - let otlp_provider = otlp_logger_init("metrics-service".to_string()); + let _otlp_log_provider = otlp_logger_init("metrics-service".to_string()); info!("Starting metrics service..."); info!("fetching data"); + let meter = + init_opentelemetry().context("Failed to initialise OpenTelemetry metrics pipeline")?; + let bpf_path = env::var(constants::BPF_PATH).context("BPF_PATH environment variable required")?; let data = fs::read(Path::new(&bpf_path)).context("Failed to load file from path")?; @@ -35,30 +49,33 @@ async fn main() -> Result<(), anyhow::Error> { info!("Running Ebpf logger"); info!("loading programs"); - let bpf_map_save_path = std::env::var(constants::PIN_MAP_PATH) - .context("PIN_MAP_PATH environment variable required")?; + + let bpf_map_save_path = + env::var(constants::PIN_MAP_PATH).context("PIN_MAP_PATH environment variable required")?; let map_data = vec!["time_stamp_events".to_string(), "net_metrics".to_string()]; match init_bpf_maps(bpf.clone(), map_data) { - std::result::Result::Ok(bpf_maps) => { + Ok(bpf_maps) => { info!("BPF maps loaded successfully"); let pin_path = std::path::PathBuf::from(&bpf_map_save_path); info!("About to call map_pinner with path: {:?}", pin_path); + match map_pinner(bpf_maps, &pin_path) { - std::result::Result::Ok(maps) => { + Ok(maps) => { info!("BPF maps pinned successfully to {}", bpf_map_save_path); { load_program(bpf.clone(), "metrics_tracer", "tcp_identify_packet_loss") .context( - "An error occured during the execution of load_program function", + "An error occurred during the execution of load_program function", )?; - load_program(tcp_bpf,"tcp_v4_connect","tcp_v4_connect") - .context("An error occured during the execution of load_and_attach_tcp_programs function")?; - load_program(tcp_v6_bpf,"tcp_v6_connect","tcp_v6_connect") - .context("An error occured during the execution of load_and_attach_tcp_programs function")?; + load_program(tcp_bpf, "tcp_v4_connect", "tcp_v4_connect") + .context("An error occurred during the execution of load_and_attach_tcp_programs function")?; + + load_program(tcp_v6_bpf, "tcp_v6_connect", "tcp_v6_connect") + .context("An error occurred during the execution of load_and_attach_tcp_programs function")?; load_program( tcp_rev_bpf, @@ -66,23 +83,24 @@ async fn main() -> Result<(), anyhow::Error> { "tcp_rcv_state_process", ) .context( - "An error occured during the execution of load_program function", + "An error occurred during the execution of load_program function", )?; } - event_listener(maps).await?; + + // Hand off to the async event consumer + event_listener(maps, meter).await } Err(e) => { error!("Error pinning BPF maps: {:?}", e); - return Err(e); + shutdown_opentelemetry(); + Err(e) } } } Err(e) => { error!("Error initializing BPF maps: {:?}", e); - let _ = otlp_provider.shutdown(); - return Err(e); + shutdown_opentelemetry(); + Err(e) } } - - Ok(()) } diff --git a/core/src/components/metrics/src/mod.rs b/core/src/components/metrics/src/mod.rs index 8414b63..c5e2806 100644 --- a/core/src/components/metrics/src/mod.rs +++ b/core/src/components/metrics/src/mod.rs @@ -1,3 +1,2 @@ -mod structs; -mod enums; -mod helpers; \ No newline at end of file +mod helpers; +mod otel_init; diff --git a/core/src/components/metrics/src/otel_init.rs b/core/src/components/metrics/src/otel_init.rs new file mode 100644 index 0000000..e472c7e --- /dev/null +++ b/core/src/components/metrics/src/otel_init.rs @@ -0,0 +1,120 @@ +//! docs +//! This module configures and bootstraps the OpenTelemetry SDK (OTel SDK) +//! within the `metrics` binary. Its goal is to expose a [`Meter`] --- the +//! primary entry-point for creating counters, gauges and histograms --- +//! backed by an **OTLP/gRPC** metric exporter. +//! +//! # Relationship to the rest of the crate +//! +//! `otel_init::init_opentelemetry()` is invoked **once** in [`main`], before +//! any eBPF program is loaded. The returned [`Meter`] is then passed through +//! the call chain into [`event_listener`](crate::helpers::event_listener) +//! where it is used by the async tasks that read eBPF perf-buffers. See +//! [`crate::helpers`] for the consumption side. +//! +//! When the application exits (either because `Ctrl-C` was received or because +//! an error bubbled up), [`shutdown_opentelemetry`] is called. This flushes +//! every remaining aggregated metric to the OTLP collector before the process +//! terminates. +//! + +use opentelemetry::global; +use opentelemetry::metrics::{Meter, MeterProvider}; +use opentelemetry_otlp::{MetricExporter, WithExportConfig}; +use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; +use std::env; +use std::sync::OnceLock; +use std::time::Duration; + +/// Environment variable that holds the OTLP collector endpoint. +/// +/// Expected format: `"http://collector:4317"` (gRPC transport). +/// +pub const OTEL_EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT"; + +/// Default OTLP endpoint used when [`OTEL_EXPORTER_OTLP_ENDPOINT`] is not +/// present in the environment. +/// +/// Points to a locally-running OpenTelemetry Collector on the standard +/// **gRPC** port `4317`. Note that OTLP over HTTP typically uses `4318` --- +/// make sure your Collector is actually listening for **gRPC** traffic on the +/// port you configure. +pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317"; + +/// Singleton that owns the concrete `SdkMeterProvider` instance. +/// OnceLock guarantees single initialisation, we avoid accidentally creating two providers (and +/// two background export tasks) if `init_opentelemetry()` were ever called +/// twice. +/// +/// # Thread safety +/// +/// `OnceLock` is `Sync`, so the static can be read safely from any thread +/// or Tokio task once populated. +static METER_PROVIDER: OnceLock = OnceLock::new(); +/// docs: +/// Initialise the OpenTelemetry SDK, wire up the OTLP/gRPC exporter, and +/// return a [`Meter`] ready for instrumenting the `metrics` crate. +/// +/// 1. Read the endpoint from [`OTEL_EXPORTER_OTLP_ENDPOINT`] with the +/// hard-coded default [`DEFAULT_OTLP_ENDPOINT`]. +/// 2. Build a `MetricExporter` using the Tonic / gRPC transport: +/// - with_tonic()` enables the Tonic-based gRPC client. +/// - `with_endpoint()` sets the target Collector URL. +/// - `with_timeout(Duration::from_secs(10))` caps each export RPC to 10 +/// seconds; if the Collector is unreachable the RPC aborts instead of +/// hanging indefinitely. +/// 3. Wrap the exporter in a `PeriodicReader`. The reader collects +/// aggregated metrics from every instrument every 5 seconds and hands +/// them to the exporter. This is the "push" model --- metrics leave the +/// process automatically without an external scraper. +/// 4. Construct an `SdkMeterProvider` and register it as the global +/// meter provider (`global::set_meter_provider`). The global handle is +/// needed for instrumenting code spawned in other Tokio tasks (see +/// [`helpers::event_listener`](crate::helpers::event_listener)). +/// 5. Keep a clone of the concrete provider in `METER_PROVIDER` so that +/// [`shutdown_opentelemetry`] can later call `SdkMeterProvider::shutdown()`. +/// 6. Create a `Meter named `"cortexbrain-metrics"` and return it. +/// +/// Potential causes of errors: +/// +/// * An invalid endpoint URL (malformed string). +/// * Network-level failure during exporter construction. +/// * The provider already having been initialised +/// +pub fn init_opentelemetry() -> Result { + let endpoint = + env::var(OTEL_EXPORTER_OTLP_ENDPOINT).unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string()); + + let exporter = MetricExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_timeout(Duration::from_secs(10)) + .build()?; + + let reader = PeriodicReader::builder(exporter) + .with_interval(Duration::from_secs(5)) + .build(); + + let provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Make the provider globally discoverable. This clone is cheap because + // SdkMeterProvider is an Arc-backed handle. + global::set_meter_provider(provider.clone()); + + // Stash the concrete handle so shutdown_opentelemetry can flush. + METER_PROVIDER + .set(provider.clone()) + .map_err(|_| anyhow::anyhow!("OpenTelemetry meter provider already initialised"))?; + + let meter = provider.meter("cortexbrain-metrics"); + Ok(meter) +} +/// docs: +/// Flush every buffered metric to the OTLP collector and shut down the SDK. +pub fn shutdown_opentelemetry() { + if let Some(provider) = METER_PROVIDER.get() + && let Err(e) = provider.shutdown() + { + tracing::error!("Failed to shut down OpenTelemetry meter provider: {:?}", e); + } +} diff --git a/core/src/testing/metrics.yaml b/core/src/testing/metrics.yaml index 262b28f..8a6c7d8 100644 --- a/core/src/testing/metrics.yaml +++ b/core/src/testing/metrics.yaml @@ -19,7 +19,7 @@ spec: hostNetwork: true containers: - name: metrics - image: lorenzotettamanti/cortexflow-metrics:0.1.2-test12 + image: lorenzotettamanti/cortexflow-metrics:otel-test-2 command: ["/bin/bash", "-c"] args: - | diff --git a/core/src/testing/otel_agent.yaml b/core/src/testing/otel_agent.yaml index 71b7e08..c5165ac 100644 --- a/core/src/testing/otel_agent.yaml +++ b/core/src/testing/otel_agent.yaml @@ -33,6 +33,9 @@ data: logs: receivers: [otlp] exporters: [otlp, logging] + metrics: + receivers: [otlp] + exporters: [otlp, logging] --- apiVersion: apps/v1 @@ -132,6 +135,10 @@ data: receivers: [otlp] processors: [memory_limiter] exporters: [logging] + metrics: + receivers: [otlp] + processors: [memory_limiter] + exporters: [logging] --- apiVersion: v1