diff --git a/Cargo.lock b/Cargo.lock index 6930c0bd..91fdc4e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -414,6 +414,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "displaydoc" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac70aa55017e108007fbaf5aa0f54b021c98f92ff8af59d42eda9da96e3dd4f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dtoa" version = "1.0.11" @@ -493,6 +504,9 @@ dependencies = [ "log", "native-tls", "openssl", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "prometheus-client", "prost", "prost-types", @@ -585,6 +599,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + [[package]] name = "futures-channel" version = "0.3.32" @@ -592,6 +615,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -600,6 +624,34 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.32" @@ -625,8 +677,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "slab", ] @@ -871,13 +926,16 @@ version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ + "base64", "bytes", "futures-channel", "futures-util", "http", "http-body", "hyper", + "ipnet", "libc", + "percent-encoding", "pin-project-lite", "socket2", "tokio", @@ -885,6 +943,109 @@ dependencies = [ "tracing", ] +[[package]] +name = "icu_collections" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2984d1cd16c883d7935b9e07e44071dca8d917fd52ecc02c04d5fa0b5a3f191c" +dependencies = [ + "displaydoc", + "potential_utf", + "utf8_iter", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c56e5ee99d6e3d33bd91c5d85458b6005a22140021cc324cea84dd0e72cff3b4" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da3be0ae77ea334f4da67c12f149704f19f81d1adf7c51cf482943e84a2bad38" + +[[package]] +name = "icu_properties" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bee3b67d0ea5c2cca5003417989af8996f8604e34fb9ddf96208a033901e70de" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e2bbb201e0c04f7b4b3e14382af113e17ba4f63e2c9d2ee626b720cbce54a14" + +[[package]] +name = "icu_provider" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb68373c0d6620ef8105e855e7745e18b0d00d3bdb07fb532e434244cdb9a714" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "indexmap" version = "2.14.0" @@ -895,6 +1056,12 @@ dependencies = [ "hashbrown 0.17.1", ] +[[package]] +name = "ipnet" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -983,6 +1150,12 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" +[[package]] +name = "litemap" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" + [[package]] name = "lock_api" version = "0.4.14" @@ -1139,6 +1312,76 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0142c63252a9e054e68a4c61a5778f7b14f576274d593f8ce883d191a099682" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5683015d09e2df236ef005b17f6f196f0d5f6313c4fa43a7b6a53b52776e4331" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", + "reqwest", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9966929966d17620d7c316c643ba62631826e10021409357772d5eea84f62c35" +dependencies = [ + "http", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56d658ba1faf63f7b9c492cfbe6e0ec365440a16132d3270c1065f7b33f1b638" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b59f80e1ac4d5ff7a2db8fb6c80badb7f0f3f858211fba08dd9aaec750894f9" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "portable-atomic", + "rand 0.9.4", + "thiserror", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -1226,6 +1469,15 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "potential_utf" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564" +dependencies = [ + "zerovec", +] + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1501,6 +1753,37 @@ version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6f6ff9a378485b298a5286656da665ba74413d36db0979633275d2e708145d4" +[[package]] +name = "reqwest" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "219c5811de6525e5416c7d5d53bb656d3afdbc6c5af816e0802bcfa42dbdc1c3" +dependencies = [ + "base64", + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "rustc-hash" version = "2.1.3" @@ -1660,6 +1943,12 @@ dependencies = [ "lock_api", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + [[package]] name = "strsim" version = "0.11.1" @@ -1682,6 +1971,20 @@ name = "sync_wrapper" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "tempfile" @@ -1716,6 +2019,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tinystr" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tokio" version = "1.52.3" @@ -1865,6 +2178,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" +dependencies = [ + "bitflags 2.13.0", + "bytes", + "futures-util", + "http", + "http-body", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", + "url", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -1926,6 +2257,24 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "url" +version = "2.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -1986,6 +2335,16 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c62df1340f32221cb9c54d6a27b030e3dba64361d4a95bed55f9aacb44da291d" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.126" @@ -2081,6 +2440,12 @@ version = "0.57.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" +[[package]] +name = "writeable" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" + [[package]] name = "yaml-rust2" version = "0.11.0" @@ -2092,6 +2457,29 @@ dependencies = [ "hashlink", ] +[[package]] +name = "yoke" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "709fe23a0424b6a435d82152b1bd3fdfb0833487d5fa90d05d42762a9891fef5" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.8.52" @@ -2112,6 +2500,60 @@ dependencies = [ "syn", ] +[[package]] +name = "zerofrom" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ec05a11813ea801ff6d75110ad09cd0824ddba17dfe17128ea0d5f68e6c5272" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zmij" version = "1.0.21" diff --git a/fact/Cargo.toml b/fact/Cargo.toml index 8bba7aff..4e649315 100644 --- a/fact/Cargo.toml +++ b/fact/Cargo.toml @@ -20,6 +20,9 @@ hyper-util = { workspace = true } libc = { workspace = true } log = { workspace = true } native-tls = { workspace = true } +opentelemetry = { version = "0.32.0", optional = true } +opentelemetry-otlp = { version = "0.32.0", optional = true } +opentelemetry_sdk = { version = "0.32.1", features = [ "logs"], optional = true } openssl = { workspace = true } tonic = { workspace = true } tokio = { workspace = true } @@ -51,3 +54,4 @@ path = "src/main.rs" [features] bpf-test = [] +otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp"] diff --git a/fact/src/config/mod.rs b/fact/src/config/mod.rs index ff829eb5..88daa519 100644 --- a/fact/src/config/mod.rs +++ b/fact/src/config/mod.rs @@ -34,6 +34,7 @@ fn yaml_to_duration_secs(v: &Yaml) -> Option { pub struct FactConfig { paths: Option>, pub grpc: GrpcConfig, + pub otel: OTelConfig, pub endpoint: EndpointConfig, pub bpf: BpfConfig, skip_pre_flight: Option, @@ -72,7 +73,7 @@ impl FactConfig { )?; // Once file configuration is handled, apply CLI arguments - static CLI_ARGS: LazyLock = LazyLock::new(|| FactCli::parse().to_config()); + static CLI_ARGS: LazyLock = LazyLock::new(|| FactCli::parse().into_config()); config.update(&CLI_ARGS); Ok(config) @@ -84,6 +85,7 @@ impl FactConfig { } self.grpc.update(&from.grpc); + self.otel.update(&from.otel); self.endpoint.update(&from.endpoint); self.bpf.update(&from.bpf); @@ -196,6 +198,10 @@ impl TryFrom> for FactConfig { let grpc = v.as_hash().unwrap(); config.grpc = GrpcConfig::try_from(grpc)?; } + "otel" if v.is_hash() => { + let otel = v.as_hash().unwrap(); + config.otel = OTelConfig::try_from(otel)?; + } "endpoint" if v.is_hash() => { let endpoint = v.as_hash().unwrap(); config.endpoint = EndpointConfig::try_from(endpoint)?; @@ -492,6 +498,48 @@ impl TryFrom<&yaml::Hash> for GrpcConfig { } } +#[derive(Debug, Default, PartialEq, Eq, Clone)] +pub struct OTelConfig { + endpoint: Option, +} + +impl OTelConfig { + fn update(&mut self, from: &OTelConfig) { + if let Some(endpoint) = from.endpoint.as_deref() { + self.endpoint = Some(endpoint.to_owned()); + } + } + + pub fn endpoint(&self) -> Option<&str> { + self.endpoint.as_deref() + } +} + +impl TryFrom<&yaml::Hash> for OTelConfig { + type Error = anyhow::Error; + + fn try_from(value: &yaml::Hash) -> Result { + let mut otel = OTelConfig::default(); + for (k, v) in value.iter() { + let Some(k) = k.as_str() else { + bail!("key is not string: {k:?}"); + }; + + match k { + "endpoint" => { + let Some(endpoint) = v.as_str() else { + bail!("otel.endpoint field has incorrect type: {v:?}") + }; + otel.endpoint = Some(endpoint.to_owned()); + } + name => bail!("Invalid field 'otel.{name}' with value: {v:?}"), + } + } + + Ok(otel) + } +} + #[derive(Debug, Default, PartialEq, Eq, Clone)] pub struct BpfConfig { ringbuf_size: Option, @@ -627,6 +675,10 @@ pub struct FactCli { #[arg(long, env = "FACT_GRPC_BACKOFF_RETRIES_MAX")] backoff_retries_max: Option, + /// OpenTelemetry endpoint to push logs into + #[arg(long, env = "FACT_OTEL_ENDPOINT")] + otel_endpoint: Option, + /// The port to bind for all exposed endpoints #[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")] address: Option, @@ -711,12 +763,12 @@ pub struct FactCli { } impl FactCli { - fn to_config(&self) -> FactConfig { + fn into_config(self) -> FactConfig { FactConfig { - paths: self.paths.clone(), + paths: self.paths, grpc: GrpcConfig { - url: self.url.clone(), - certs: self.certs.clone(), + url: self.url, + certs: self.certs, backoff: BackoffConfig { initial: self.backoff_initial, max: self.backoff_max, @@ -725,6 +777,9 @@ impl FactCli { retries_max: self.backoff_retries_max, }, }, + otel: OTelConfig { + endpoint: self.otel_endpoint, + }, endpoint: EndpointConfig { address: self.address, expose_metrics: resolve_bool_arg(self.expose_metrics, self.no_expose_metrics), diff --git a/fact/src/config/reloader.rs b/fact/src/config/reloader.rs index 2ac14742..48baecb7 100644 --- a/fact/src/config/reloader.rs +++ b/fact/src/config/reloader.rs @@ -9,12 +9,15 @@ use tokio::{ time::interval, }; +use crate::config::OTelConfig; + use super::{CONFIG_FILES, EndpointConfig, FactConfig, GrpcConfig}; pub struct Reloader { config: FactConfig, endpoint: watch::Sender, grpc: watch::Sender, + otel: watch::Sender, paths: watch::Sender>, files: HashMap<&'static str, i64>, scan_interval: watch::Sender, @@ -71,6 +74,12 @@ impl Reloader { self.grpc.subscribe() } + /// Subscribe to get notifications when otel configuration is + /// changed. + pub fn otel(&self) -> watch::Receiver { + self.otel.subscribe() + } + /// Subscribe to get notifications when paths configuration is /// changed. pub fn paths(&self) -> watch::Receiver> { @@ -174,6 +183,16 @@ impl Reloader { } }); + self.otel.send_if_modified(|old| { + if *old != new.otel { + debug!("Sending new OTel configuration..."); + *old = new.otel.clone(); + true + } else { + false + } + }); + self.paths.send_if_modified(|old| { let new = new.paths(); if *old != new { @@ -238,6 +257,7 @@ impl From for Reloader { .collect(); let (endpoint, _) = watch::channel(config.endpoint.clone()); let (grpc, _) = watch::channel(config.grpc.clone()); + let (otel, _) = watch::channel(config.otel.clone()); let (paths, _) = watch::channel(config.paths().to_vec()); let (scan_interval, _) = watch::channel(config.scan_interval()); let (rate_limit, _) = watch::channel(config.rate_limit()); @@ -247,6 +267,7 @@ impl From for Reloader { config, endpoint, grpc, + otel, paths, scan_interval, rate_limit, diff --git a/fact/src/config/tests.rs b/fact/src/config/tests.rs index 01edbbbf..57f4d401 100644 --- a/fact/src/config/tests.rs +++ b/fact/src/config/tests.rs @@ -49,6 +49,144 @@ fn parsing() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + initial: 2 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(2)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + max: 30 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(30)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + jitter: false + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + jitter: Some(false), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + multiplier: 2 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(2.0), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + multiplier: 3.5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(3.5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + retries: 5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + retries_max: Some(5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + initial: 0.5 + max: 120 + jitter: false + multiplier: 2 + retries: 5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs_f64(0.5)), + max: Some(Duration::from_secs(120)), + jitter: Some(false), + multiplier: Some(2.0), + retries_max: Some(5), + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + otel: + endpoint: http://localhost:4317 + "#, + FactConfig { + otel: OTelConfig { + endpoint: Some("http://localhost:4317".into()), + }, + ..Default::default() + }, + ), ( r#" endpoint: @@ -255,136 +393,12 @@ fn parsing() { ..Default::default() }, ), - ( - r#" - grpc: - backoff: - initial: 2 - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - initial: Some(Duration::from_secs(2)), - ..Default::default() - }, - ..Default::default() - }, - ..Default::default() - }, - ), - ( - r#" - grpc: - backoff: - max: 30 - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - max: Some(Duration::from_secs(30)), - ..Default::default() - }, - ..Default::default() - }, - ..Default::default() - }, - ), - ( - r#" - grpc: - backoff: - jitter: false - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - jitter: Some(false), - ..Default::default() - }, - ..Default::default() - }, - ..Default::default() - }, - ), - ( - r#" - grpc: - backoff: - multiplier: 2 - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - multiplier: Some(2.0), - ..Default::default() - }, - ..Default::default() - }, - ..Default::default() - }, - ), - ( - r#" - grpc: - backoff: - multiplier: 3.5 - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - multiplier: Some(3.5), - ..Default::default() - }, - ..Default::default() - }, - ..Default::default() - }, - ), - ( - r#" - grpc: - backoff: - retries: 5 - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - retries_max: Some(5), - ..Default::default() - }, - ..Default::default() - }, - ..Default::default() - }, - ), - ( - r#" - grpc: - backoff: - initial: 0.5 - max: 120 - jitter: false - multiplier: 2 - retries: 5 - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - initial: Some(Duration::from_secs_f64(0.5)), - max: Some(Duration::from_secs(120)), - jitter: Some(false), - multiplier: Some(2.0), - retries_max: Some(5), - }, - ..Default::default() - }, - ..Default::default() - }, - ), ( r#" paths: - /etc + otel: + endpoint: 'http://localhost:4317' grpc: url: 'https://svc.sensor.stackrox:9090' certs: /etc/stackrox/certs @@ -419,6 +433,9 @@ fn parsing() { retries_max: Some(5), }, }, + otel: OTelConfig { + endpoint: Some("http://localhost:4317".into()), + }, endpoint: EndpointConfig { address: Some(SocketAddr::from(([0, 0, 0, 0], 8080))), expose_metrics: Some(true), @@ -596,6 +613,26 @@ paths: "#, "Invalid field 'grpc.backoff.unknown' with value: Integer(4)", ), + ( + r#" + otel: 5 + "#, + "Invalid field 'otel' with value: Integer(5)", + ), + ( + r#" + otel: + something: true + "#, + "Invalid field 'otel.something' with value: Boolean(true)", + ), + ( + r#" + otel: + endpoint: false + "#, + "otel.endpoint field has incorrect type: Boolean(false)", + ), ( "endpoint: true", "Invalid field 'endpoint' with value: Boolean(true)", @@ -1148,6 +1185,37 @@ fn update() { ..Default::default() }, ), + ( + r#" + otel: + endpoint: 'http://localhost:4317' + "#, + FactConfig::default(), + FactConfig { + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:4317")), + }, + ..Default::default() + }, + ), + ( + r#" + otel: + endpoint: 'http://localhost:4317' + "#, + FactConfig { + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:1234")), + }, + ..Default::default() + }, + FactConfig { + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:4317")), + }, + ..Default::default() + }, + ), ( r#" endpoint: @@ -1516,6 +1584,8 @@ fn update() { jitter: false multiplier: 3.0 retries: 5 + otel: + endpoint: 'http://localhost:4317' endpoint: address: 127.0.0.1:8080 expose_metrics: true @@ -1541,6 +1611,9 @@ fn update() { retries_max: Some(20), }, }, + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:1234")), + }, endpoint: EndpointConfig { address: Some(SocketAddr::from(([0, 0, 0, 0], 9000))), expose_metrics: Some(false), @@ -1569,6 +1642,9 @@ fn update() { retries_max: Some(5), }, }, + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:4317")), + }, endpoint: EndpointConfig { address: Some(SocketAddr::from(([127, 0, 0, 1], 8080))), expose_metrics: Some(true), @@ -1678,7 +1754,7 @@ impl Display for EnvVar { fn with_env_var(env: EnvVar) -> Result { let _guard = env.set(); - FactCli::try_parse_from(["fact"]).map(|cli| cli.to_config()) + FactCli::try_parse_from(["fact"]).map(|cli| cli.into_config()) } #[test] @@ -1866,6 +1942,18 @@ fn env_vars() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_OTEL_ENDPOINT", + value: "http://localhost:4317", + }, + FactConfig { + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:4317")), + }, + ..Default::default() + }, + ), ( EnvVar { name: "FACT_ENDPOINT_ADDRESS", @@ -2011,6 +2099,22 @@ fn env_vars_override_yaml() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_OTEL_ENDPOINT", + value: "http://localhost:4317", + }, + r#" + otel: + endpoint: 'http://localhost:1234' + "#, + FactConfig { + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:4317")), + }, + ..Default::default() + }, + ), ( EnvVar { name: "FACT_PATHS", diff --git a/fact/src/event/mod.rs b/fact/src/event/mod.rs index a91c5f00..49a50229 100644 --- a/fact/src/event/mod.rs +++ b/fact/src/event/mod.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "otel")] +use std::collections::HashMap; #[cfg(all(test, feature = "bpf-test"))] use std::time::{SystemTime, UNIX_EPOCH}; use std::{ @@ -7,6 +9,8 @@ use std::{ }; use globset::GlobSet; +#[cfg(feature = "otel")] +use opentelemetry::logs::AnyValue; use serde::Serialize; use fact_ebpf::{ @@ -357,6 +361,18 @@ impl From for fact_api::FileActivity { } } +#[cfg(feature = "otel")] +impl From for opentelemetry::logs::AnyValue { + fn from(value: Event) -> Self { + AnyValue::Map(Box::new(HashMap::from([ + ("file".into(), value.file.into()), + ("timestamp".into(), AnyValue::Int(value.timestamp as i64)), + ("process".into(), value.process.into()), + ("hostname".into(), value.hostname.into()), + ]))) + } +} + #[cfg(test)] impl PartialEq for Event { fn eq(&self, other: &Self) -> bool { @@ -444,6 +460,22 @@ impl FileData { Ok(file) } + + #[cfg(feature = "otel")] + fn event_type(&self) -> &'static str { + match self { + FileData::Open(_) => "open", + FileData::Creation(_) => "creation", + FileData::MkDir(_) => "mkdir", + FileData::RmDir(_) => "rmdir", + FileData::Unlink(_) => "unlink", + FileData::Chmod(_) => "chmod", + FileData::Chown(_) => "chown", + FileData::Rename(_) => "rename", + FileData::SetXattr(_) => "setxattr", + FileData::RemoveXattr(_) => "removexattr", + } + } } impl From for fact_api::file_activity::File { @@ -494,6 +526,30 @@ impl From for fact_api::file_activity::File { } } +#[cfg(feature = "otel")] +impl From for opentelemetry::logs::AnyValue { + fn from(value: FileData) -> Self { + let event_type = value.event_type(); + let AnyValue::Map(mut map) = (match value { + FileData::Open(data) + | FileData::Creation(data) + | FileData::MkDir(data) + | FileData::RmDir(data) + | FileData::Unlink(data) => AnyValue::from(data), + FileData::Chmod(data) => AnyValue::from(data), + FileData::Chown(data) => AnyValue::from(data), + FileData::Rename(data) => AnyValue::from(data), + FileData::SetXattr(data) | FileData::RemoveXattr(data) => AnyValue::from(data), + }) else { + unreachable!("event data did not serialize to map"); + }; + + map.insert("event_type".into(), event_type.into()); + + AnyValue::Map(map) + } +} + #[cfg(test)] impl PartialEq for FileData { fn eq(&self, other: &Self) -> bool { @@ -554,6 +610,22 @@ impl From for fact_api::FileActivityBase { } } +#[cfg(feature = "otel")] +impl From for opentelemetry::logs::AnyValue { + fn from(value: BaseFileData) -> Self { + AnyValue::Map(Box::new(HashMap::from([ + ( + "filename".into(), + value.filename.to_string_lossy().to_string().into(), + ), + ( + "host_path".into(), + value.host_file.to_string_lossy().to_string().into(), + ), + ]))) + } +} + #[derive(Debug, Clone, Serialize)] pub struct ChmodFileData { inner: BaseFileData, @@ -576,6 +648,21 @@ impl From for fact_api::FilePermissionChange { } } +#[cfg(feature = "otel")] +impl From for opentelemetry::logs::AnyValue { + fn from(value: ChmodFileData) -> Self { + let AnyValue::Map(mut map) = value.inner.into() else { + unreachable!("inner value did not serialize to map"); + }; + map.extend([ + ("new_mode".into(), value.new_mode.into()), + ("old_mode".into(), value.old_mode.into()), + ]); + + AnyValue::Map(map) + } +} + #[cfg(test)] impl PartialEq for ChmodFileData { fn eq(&self, other: &Self) -> bool { @@ -613,6 +700,23 @@ impl From for fact_api::FileOwnershipChange { } } +#[cfg(feature = "otel")] +impl From for opentelemetry::logs::AnyValue { + fn from(value: ChownFileData) -> Self { + let AnyValue::Map(mut map) = value.inner.into() else { + unreachable!("inner value did not serialize to map"); + }; + map.extend([ + ("new_uid".into(), value.new_uid.into()), + ("old_uid".into(), value.old_uid.into()), + ("new_gid".into(), value.new_gid.into()), + ("old_gid".into(), value.old_gid.into()), + ]); + + AnyValue::Map(map) + } +} + #[derive(Debug, Clone, Serialize)] pub struct RenameFileData { new: BaseFileData, @@ -630,6 +734,17 @@ impl From for fact_api::FileRename { } } +#[cfg(feature = "otel")] +impl From for opentelemetry::logs::AnyValue { + fn from(value: RenameFileData) -> Self { + let AnyValue::Map(mut map) = value.new.into() else { + unreachable!("new value did not serialize to map"); + }; + map.insert("old".into(), value.old.into()); + AnyValue::Map(map) + } +} + #[cfg(test)] impl PartialEq for RenameFileData { fn eq(&self, other: &Self) -> bool { @@ -653,6 +768,18 @@ impl From for fact_api::FileXattrChange { } } +#[cfg(feature = "otel")] +impl From for opentelemetry::logs::AnyValue { + fn from(value: XattrFileData) -> Self { + let AnyValue::Map(mut map) = value.inner.into() else { + unreachable!("inner value did not serialize to map"); + }; + map.insert("xattr_name".into(), value.xattr_name.into()); + + AnyValue::Map(map) + } +} + #[cfg(test)] impl PartialEq for XattrFileData { fn eq(&self, other: &Self) -> bool { diff --git a/fact/src/event/process.rs b/fact/src/event/process.rs index f295ac66..f18d4e42 100644 --- a/fact/src/event/process.rs +++ b/fact/src/event/process.rs @@ -1,6 +1,10 @@ +#[cfg(feature = "otel")] +use std::collections::HashMap; use std::{ffi::CStr, path::PathBuf}; use fact_ebpf::{lineage_t, process_t}; +#[cfg(feature = "otel")] +use opentelemetry::logs::AnyValue; use serde::Serialize; use uuid::Uuid; @@ -38,6 +42,19 @@ impl From for fact_api::process_signal::LineageInfo { } } +#[cfg(feature = "otel")] +impl From for opentelemetry::logs::AnyValue { + fn from(value: Lineage) -> Self { + AnyValue::Map(Box::new(HashMap::from([ + ("uid".into(), value.uid.into()), + ( + "exec_path".into(), + value.exe_path.to_string_lossy().to_string().into(), + ), + ]))) + } +} + #[derive(Debug, Clone, Default, Serialize)] pub struct Process { comm: String, @@ -220,6 +237,44 @@ impl From for fact_api::ProcessSignal { } } +#[cfg(feature = "otel")] +impl From for opentelemetry::logs::AnyValue { + fn from(value: Process) -> Self { + let args = value + .args + .into_iter() + .map(AnyValue::from) + .collect::>(); + + let lineage = value + .lineage + .into_iter() + .map(AnyValue::from) + .collect::>(); + + let mut map = HashMap::from([ + ("comm".into(), value.comm.into()), + ("args".into(), AnyValue::ListAny(Box::new(args))), + ( + "exe_path".into(), + value.exe_path.to_string_lossy().to_string().into(), + ), + ("uid".into(), value.uid.into()), + ("gid".into(), value.gid.into()), + ("login_uid".into(), value.login_uid.into()), + ("username".into(), value.username.into()), + ("in_root_mount_ns".into(), value.in_root_mount_ns.into()), + ("lineage".into(), AnyValue::ListAny(Box::new(lineage))), + ]); + + if let Some(container_id) = value.container_id { + map.insert("container_id".into(), container_id.into()); + } + + AnyValue::Map(Box::new(map)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/fact/src/lib.rs b/fact/src/lib.rs index 8ec187f6..94fe1cc4 100644 --- a/fact/src/lib.rs +++ b/fact/src/lib.rs @@ -118,6 +118,7 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { running.subscribe(), exporter.metrics.output.clone(), reloader.grpc(), + reloader.otel(), reloader.config().json(), ); let mut host_scanner_handle = host_scanner.start(); diff --git a/fact/src/metrics/mod.rs b/fact/src/metrics/mod.rs index 98297329..af8cdebc 100644 --- a/fact/src/metrics/mod.rs +++ b/fact/src/metrics/mod.rs @@ -101,6 +101,7 @@ impl EventCounter { pub struct OutputMetrics { pub stdout: EventCounter, pub grpc: EventCounter, + pub otel: EventCounter, } impl OutputMetrics { @@ -116,16 +117,23 @@ impl OutputMetrics { "Events processed by the grpc output component", &labels, ); + let otel_counter = EventCounter::new( + "output_otel_events", + "Events processed by the otel output component", + &labels, + ); OutputMetrics { stdout: stdout_counter, grpc: grpc_counter, + otel: otel_counter, } } fn register(&self, reg: &mut Registry) { self.stdout.register(reg); self.grpc.register(reg); + self.otel.register(reg); } } diff --git a/fact/src/output/grpc.rs b/fact/src/output/grpc.rs index 969a15fd..b643b363 100644 --- a/fact/src/output/grpc.rs +++ b/fact/src/output/grpc.rs @@ -9,8 +9,8 @@ use native_tls::{Certificate, Identity}; use openssl::{ec::EcKey, pkey::PKey}; use tokio::{ fs, - sync::{broadcast, watch}, - task::JoinHandle, + sync::{mpsc, oneshot, watch}, + task::JoinSet, time::sleep, }; use tokio_stream::{ @@ -21,8 +21,8 @@ use tonic::transport::Channel; use crate::{ config::{BackoffConfig, GrpcConfig}, - event::Event, metrics::EventCounter, + output::EventReceiver, }; struct Backoff { @@ -104,7 +104,7 @@ impl From<&BackoffConfig> for Backoff { } pub struct Client { - rx: broadcast::Receiver>, + subscriber: mpsc::Sender>, running: watch::Receiver, config: watch::Receiver, metrics: EventCounter, @@ -112,21 +112,21 @@ pub struct Client { impl Client { pub fn new( - rx: broadcast::Receiver>, + subscriber: mpsc::Sender>, running: watch::Receiver, metrics: EventCounter, config: watch::Receiver, ) -> Self { Client { - rx, + subscriber, running, config, metrics, } } - pub fn start(mut self) -> JoinHandle> { - tokio::spawn(async move { + pub fn start(mut self, set: &mut JoinSet>) { + set.spawn(async move { loop { let res = if self.is_enabled() { self.run().await @@ -144,7 +144,7 @@ impl Client { } } Ok(()) - }) + }); } async fn get_connector(&self) -> anyhow::Result>> { @@ -230,19 +230,21 @@ impl Client { let mut client = FileActivityServiceClient::new(channel); let metrics = self.metrics.clone(); - let rx = - BroadcastStream::new(self.rx.resubscribe()).filter_map(move |event| match event { - Ok(event) => { - metrics.added(); - let event = Arc::unwrap_or_clone(event); - Some(event.into()) - } - Err(BroadcastStreamRecvError::Lagged(n)) => { - warn!("gRPC stream lagged, dropped {n} events"); - metrics.dropped_n(n); - None - } - }); + let (tx, rx) = oneshot::channel(); + self.subscriber.send(tx).await?; + let rx = rx.await?; + let rx = BroadcastStream::new(rx).filter_map(move |event| match event { + Ok(event) => { + metrics.added(); + let event = Arc::unwrap_or_clone(event); + Some(event.into()) + } + Err(BroadcastStreamRecvError::Lagged(n)) => { + warn!("gRPC stream lagged, dropped {n} events"); + metrics.dropped_n(n); + None + } + }); tokio::select! { res = client.communicate(rx) => { diff --git a/fact/src/output/mod.rs b/fact/src/output/mod.rs index 756c8eec..373ff9e2 100644 --- a/fact/src/output/mod.rs +++ b/fact/src/output/mod.rs @@ -1,17 +1,24 @@ -use std::{borrow::BorrowMut, sync::Arc}; +use std::sync::Arc; -use anyhow::bail; use log::{debug, warn}; use tokio::{ sync::{broadcast, mpsc, watch}, - task::JoinHandle, + task::{JoinHandle, JoinSet}, }; -use crate::{config::GrpcConfig, event::Event, metrics::OutputMetrics}; +use crate::{ + config::{GrpcConfig, OTelConfig}, + event::Event, + metrics::OutputMetrics, +}; mod grpc; +#[cfg(feature = "otel")] +mod otel; mod stdout; +type EventReceiver = broadcast::Receiver>; + /// Starts all the output tasks. /// /// Each task is responsible for managing its lifetime, handling @@ -20,56 +27,79 @@ pub fn start( mut rx: mpsc::Receiver, running: watch::Receiver, metrics: OutputMetrics, - config: watch::Receiver, + grpc_config: watch::Receiver, + #[allow(unused)] otel_config: watch::Receiver, stdout_enabled: bool, ) -> JoinHandle> { - let (broad_tx, broad_rx) = broadcast::channel(100); + let (broad_tx, _) = broadcast::channel(100); + let (subs_req, mut subs_rx) = mpsc::channel(10); let mut run = running.clone(); + let mut handles = JoinSet::new(); + let mut enabled_outputs = Vec::new(); let grpc_client = grpc::Client::new( - broad_rx.resubscribe(), + subs_req.clone(), running.clone(), metrics.grpc.clone(), - config.clone(), + grpc_config, ); + enabled_outputs.push(grpc_client.is_enabled()); + grpc_client.start(&mut handles); - // JSON client will only start if explicitly enabled or no other - // output is active at startup - if !grpc_client.is_enabled() || stdout_enabled { - stdout::Client::new( - broad_rx.resubscribe(), + #[cfg(feature = "otel")] + { + let otel_client = otel::Client::new( + subs_req.clone(), running.clone(), - metrics.stdout.clone(), - ) - .start(); + metrics.otel.clone(), + otel_config, + ); + enabled_outputs.push(otel_client.is_enabled()); + otel_client.start(&mut handles); } - let mut grpc_handle = grpc_client.start(); + // JSON client will only start if explicitly enabled or no other + // output is active at startup + if stdout_enabled || enabled_outputs.iter().all(|enabled| !enabled) { + stdout::Client::new(subs_req, running.clone(), metrics.stdout.clone()).start(); + } tokio::spawn(async move { debug!("Starting output component..."); - loop { + let res = loop { tokio::select! { event = rx.recv() => { let Some(event) = event else { - break; + // Channel has been closed and no more messages + // are present. + break Ok(()); }; if let Err(e) = broad_tx.send(Arc::new(event)) { warn!("Failed to forward output event: {e}"); } } - res = grpc_handle.borrow_mut() => { + req = subs_rx.recv() => { + let Some(req) = req else { continue; }; + let rx = broad_tx.subscribe(); + if let Err(e) = req.send(rx) { + break Err(anyhow::anyhow!("Failed to subscribe worker: {e:?}")); + } + } + res = handles.join_next() => { + let Some(res) = res else { + unreachable!("output handles should always have a task"); + }; match res { - Ok(Ok(_)) => break, - Ok(Err(e)) => bail!("gRPC worker errored out: {e:?}"), - Err(e) => bail!("gRPC task errored out: {e:?}"), + Ok(Ok(_)) => break Ok(()), + Ok(Err(e)) => break Err(e), + Err(e) => break Err(e.into()), } } - _ = run.changed() => if !*run.borrow() { break; } + _ = run.changed() => if !*run.borrow() { break Ok(()); } } - } + }; debug!("Stopping output component..."); - Ok(()) + res }) } diff --git a/fact/src/output/otel.rs b/fact/src/output/otel.rs new file mode 100644 index 00000000..8a33ab3b --- /dev/null +++ b/fact/src/output/otel.rs @@ -0,0 +1,124 @@ +use std::sync::Arc; + +use anyhow::bail; +use log::{debug, info, warn}; +use opentelemetry::logs::{AnyValue, LogRecord, Logger, LoggerProvider, Severity}; +use opentelemetry_otlp::{LogExporter, WithExportConfig}; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::logs::SdkLoggerProvider; +use tokio::{ + sync::{broadcast::error::RecvError, mpsc, oneshot, watch}, + task::JoinSet, +}; + +use crate::{config::OTelConfig, metrics::EventCounter, output::EventReceiver}; + +pub(super) struct Client { + subscriber: mpsc::Sender>, + running: watch::Receiver, + config: watch::Receiver, + metrics: EventCounter, +} + +impl Client { + pub(super) fn new( + subscriber: mpsc::Sender>, + running: watch::Receiver, + metrics: EventCounter, + config: watch::Receiver, + ) -> Self { + Client { + subscriber, + running, + config, + metrics, + } + } + + pub(super) fn start(mut self, set: &mut JoinSet>) { + set.spawn(async move { + loop { + let res = if self.is_enabled() { + self.run().await + } else { + self.idle().await + }; + + match res { + Ok(true) => info!("Reloading oTel configuration..."), + Ok(false) => { + info!("Stopping oTel output..."); + break; + } + Err(e) => bail!("oTel error: {e:?}"), + } + } + Ok(()) + }); + } + + async fn run(&mut self) -> anyhow::Result { + let Some(endpoint) = self.config.borrow().endpoint().map(|e| e.to_string()) else { + bail!("Attempted to unwrap empty endpoint"); + }; + debug!("oTel: forwarding events to {endpoint}"); + let exporter_otlp = LogExporter::builder() + .with_http() + .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) + .with_endpoint(endpoint) + .build() + .expect("Failed to create log exporter"); + + let logger_provider = SdkLoggerProvider::builder() + .with_batch_exporter(exporter_otlp) + .with_resource(Resource::builder().with_service_name("fact").build()) + .build(); + let logger = logger_provider.logger("fact"); + + let (tx, rx) = oneshot::channel(); + self.subscriber.send(tx).await?; + let mut rx = rx.await?; + + let res = loop { + tokio::select! { + event = rx.recv() => { + match event { + Ok(event) => { + self.metrics.added(); + let event = Arc::unwrap_or_clone(event).into(); + let mut record = logger.create_log_record(); + record.set_severity_number(Severity::Info); + if let AnyValue::Map(map) = event { + for (k, v) in *map { + record.add_attribute(k, v); + } + } + logger.emit(record); + } + Err(RecvError::Closed) => break Err(anyhow::anyhow!("oTel: event stream closed")), + Err(RecvError::Lagged(n)) => { + warn!("oTel stream lagged, dropped {n} events"); + self.metrics.dropped_n(n); + } + } + } + _ = self.config.changed() => break Ok(true), + _ = self.running.changed() => break Ok(*self.running.borrow()), + } + }; + + logger_provider.shutdown()?; + res + } + + pub(super) fn is_enabled(&self) -> bool { + self.config.borrow().endpoint().is_some() + } + + async fn idle(&mut self) -> anyhow::Result { + tokio::select! { + _ = self.config.changed() => Ok(true), + _ = self.running.changed() => Ok(*self.running.borrow()), + } + } +} diff --git a/fact/src/output/stdout.rs b/fact/src/output/stdout.rs index debc57fd..7b23e82e 100644 --- a/fact/src/output/stdout.rs +++ b/fact/src/output/stdout.rs @@ -1,27 +1,22 @@ -use std::sync::Arc; - use log::{info, warn}; -use tokio::sync::{ - broadcast::{self, error::RecvError}, - watch, -}; +use tokio::sync::{broadcast::error::RecvError, mpsc, oneshot, watch}; -use crate::{event::Event, metrics::EventCounter}; +use crate::{metrics::EventCounter, output::EventReceiver}; pub struct Client { - rx: broadcast::Receiver>, + subscriber: mpsc::Sender>, running: watch::Receiver, metrics: EventCounter, } impl Client { pub fn new( - rx: broadcast::Receiver>, + subscriber: mpsc::Sender>, running: watch::Receiver, metrics: EventCounter, ) -> Self { Client { - rx, + subscriber, running, metrics, } @@ -29,9 +24,12 @@ impl Client { pub fn start(mut self) { tokio::spawn(async move { + let (tx, rx) = oneshot::channel(); + self.subscriber.send(tx).await.unwrap(); + let mut rx = rx.await.unwrap(); loop { tokio::select! { - event = self.rx.recv() => { + event = rx.recv() => { let event = match event { Ok(event) => event, Err(RecvError::Closed) => {