Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
442 changes: 442 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions fact/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ hyper-util = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
native-tls = { workspace = true }
opentelemetry = { version = "0.32.0", optional = true }
opentelemetry-otlp = { version = "0.32.0", optional = true }
opentelemetry_sdk = { version = "0.32.1", features = [ "logs"], optional = true }
openssl = { workspace = true }
tonic = { workspace = true }
tokio = { workspace = true }
Expand Down Expand Up @@ -51,3 +54,4 @@ path = "src/main.rs"

[features]
bpf-test = []
otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp"]
65 changes: 60 additions & 5 deletions fact/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ fn yaml_to_duration_secs(v: &Yaml) -> Option<Duration> {
pub struct FactConfig {
paths: Option<Vec<PathBuf>>,
pub grpc: GrpcConfig,
pub otel: OTelConfig,
pub endpoint: EndpointConfig,
pub bpf: BpfConfig,
skip_pre_flight: Option<bool>,
Expand Down Expand Up @@ -72,7 +73,7 @@ impl FactConfig {
)?;

// Once file configuration is handled, apply CLI arguments
static CLI_ARGS: LazyLock<FactConfig> = LazyLock::new(|| FactCli::parse().to_config());
static CLI_ARGS: LazyLock<FactConfig> = LazyLock::new(|| FactCli::parse().into_config());
config.update(&CLI_ARGS);

Ok(config)
Expand All @@ -84,6 +85,7 @@ impl FactConfig {
}

self.grpc.update(&from.grpc);
self.otel.update(&from.otel);
self.endpoint.update(&from.endpoint);
self.bpf.update(&from.bpf);

Expand Down Expand Up @@ -196,6 +198,10 @@ impl TryFrom<Vec<Yaml>> for FactConfig {
let grpc = v.as_hash().unwrap();
config.grpc = GrpcConfig::try_from(grpc)?;
}
"otel" if v.is_hash() => {
let otel = v.as_hash().unwrap();
config.otel = OTelConfig::try_from(otel)?;
}
"endpoint" if v.is_hash() => {
let endpoint = v.as_hash().unwrap();
config.endpoint = EndpointConfig::try_from(endpoint)?;
Expand Down Expand Up @@ -492,6 +498,48 @@ impl TryFrom<&yaml::Hash> for GrpcConfig {
}
}

#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct OTelConfig {
endpoint: Option<String>,
}

impl OTelConfig {
fn update(&mut self, from: &OTelConfig) {
if let Some(endpoint) = from.endpoint.as_deref() {
self.endpoint = Some(endpoint.to_owned());
}
}

pub fn endpoint(&self) -> Option<&str> {
self.endpoint.as_deref()
}
}

impl TryFrom<&yaml::Hash> for OTelConfig {
type Error = anyhow::Error;

fn try_from(value: &yaml::Hash) -> Result<Self, Self::Error> {
let mut otel = OTelConfig::default();
for (k, v) in value.iter() {
let Some(k) = k.as_str() else {
bail!("key is not string: {k:?}");
};

match k {
"endpoint" => {
let Some(endpoint) = v.as_str() else {
bail!("otel.endpoint field has incorrect type: {v:?}")
};
otel.endpoint = Some(endpoint.to_owned());
}
name => bail!("Invalid field 'otel.{name}' with value: {v:?}"),
}
}

Ok(otel)
}
}

#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct BpfConfig {
ringbuf_size: Option<u32>,
Expand Down Expand Up @@ -627,6 +675,10 @@ pub struct FactCli {
#[arg(long, env = "FACT_GRPC_BACKOFF_RETRIES_MAX")]
backoff_retries_max: Option<u64>,

/// OpenTelemetry endpoint to push logs into
#[arg(long, env = "FACT_OTEL_ENDPOINT")]
otel_endpoint: Option<String>,

/// The port to bind for all exposed endpoints
#[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")]
address: Option<SocketAddr>,
Expand Down Expand Up @@ -711,12 +763,12 @@ pub struct FactCli {
}

impl FactCli {
fn to_config(&self) -> FactConfig {
fn into_config(self) -> FactConfig {
FactConfig {
paths: self.paths.clone(),
paths: self.paths,
grpc: GrpcConfig {
url: self.url.clone(),
certs: self.certs.clone(),
url: self.url,
certs: self.certs,
backoff: BackoffConfig {
initial: self.backoff_initial,
max: self.backoff_max,
Expand All @@ -725,6 +777,9 @@ impl FactCli {
retries_max: self.backoff_retries_max,
},
},
otel: OTelConfig {
endpoint: self.otel_endpoint,
},
endpoint: EndpointConfig {
address: self.address,
expose_metrics: resolve_bool_arg(self.expose_metrics, self.no_expose_metrics),
Expand Down
21 changes: 21 additions & 0 deletions fact/src/config/reloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ use tokio::{
time::interval,
};

use crate::config::OTelConfig;

use super::{CONFIG_FILES, EndpointConfig, FactConfig, GrpcConfig};

pub struct Reloader {
config: FactConfig,
endpoint: watch::Sender<EndpointConfig>,
grpc: watch::Sender<GrpcConfig>,
otel: watch::Sender<OTelConfig>,
paths: watch::Sender<Vec<PathBuf>>,
files: HashMap<&'static str, i64>,
scan_interval: watch::Sender<Duration>,
Expand Down Expand Up @@ -71,6 +74,12 @@ impl Reloader {
self.grpc.subscribe()
}

/// Subscribe to get notifications when otel configuration is
/// changed.
pub fn otel(&self) -> watch::Receiver<OTelConfig> {
self.otel.subscribe()
}

/// Subscribe to get notifications when paths configuration is
/// changed.
pub fn paths(&self) -> watch::Receiver<Vec<PathBuf>> {
Expand Down Expand Up @@ -174,6 +183,16 @@ impl Reloader {
}
});

self.otel.send_if_modified(|old| {
if *old != new.otel {
debug!("Sending new OTel configuration...");
*old = new.otel.clone();
true
} else {
false
}
});

self.paths.send_if_modified(|old| {
let new = new.paths();
if *old != new {
Expand Down Expand Up @@ -238,6 +257,7 @@ impl From<FactConfig> for Reloader {
.collect();
let (endpoint, _) = watch::channel(config.endpoint.clone());
let (grpc, _) = watch::channel(config.grpc.clone());
let (otel, _) = watch::channel(config.otel.clone());
let (paths, _) = watch::channel(config.paths().to_vec());
let (scan_interval, _) = watch::channel(config.scan_interval());
let (rate_limit, _) = watch::channel(config.rate_limit());
Expand All @@ -247,6 +267,7 @@ impl From<FactConfig> for Reloader {
config,
endpoint,
grpc,
otel,
paths,
scan_interval,
rate_limit,
Expand Down
Loading
Loading