From 919c07a3d6694c99b93ce057d6c8f3cb7a9cb836 Mon Sep 17 00:00:00 2001 From: Sergei Martynov Date: Tue, 20 Jan 2026 15:43:54 +0100 Subject: [PATCH 1/5] refactoring and fix linter issues --- src/bin/reporter.rs | 672 ++++++++++++++++++++++++-------------------- src/config.rs | 56 ++++ src/types.rs | 39 +++ 3 files changed, 468 insertions(+), 299 deletions(-) diff --git a/src/bin/reporter.rs b/src/bin/reporter.rs index db66398..503c811 100644 --- a/src/bin/reporter.rs +++ b/src/bin/reporter.rs @@ -3,7 +3,11 @@ //! Post component status to the CloudMon status-dashboard API. //! #![doc(html_no_source)] -use cloudmon_metrics::{api::v1::ServiceHealthResponse, config::Config}; +use cloudmon_metrics::{ + api::v1::ServiceHealthResponse, + config::{Config, StatusDashboardConfig}, + types::{Component, ComponentAttribute, IncidentData, StatusDashboardComponent}, +}; use reqwest::{ header::{HeaderMap, AUTHORIZATION}, @@ -13,9 +17,8 @@ use reqwest::{ use tokio::signal; use tokio::time::{sleep, Duration}; -use anyhow::Result; -use chrono::{DateTime, TimeZone, Utc}; -use serde::{Deserialize, Serialize}; +use anyhow::{Context, Result}; +use chrono::{TimeZone, Utc}; use serde_json::json; use std::collections::HashMap; @@ -27,45 +30,22 @@ use jwt::SignWithKey; use sha2::Sha256; use std::collections::BTreeMap; -#[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)] -pub struct ComponentAttribute { - pub name: String, - pub value: String, -} - -#[derive(Clone, Deserialize, Serialize, Debug)] -pub struct Component { - pub name: String, - pub attributes: Vec, -} - -/// Structure for deserializing components from Status Dashboard API v2 (/v2/components). -#[derive(Clone, Deserialize, Serialize, Debug)] -pub struct StatusDashboardComponent { - pub id: u32, - pub name: String, - #[serde(default)] - pub attributes: Vec, -} - -/// Structure for serializing incident data for Status Dashboard API v2 (/v2/incidents). -#[derive(Clone, Deserialize, Serialize, Debug)] -pub struct IncidentData { - pub title: String, - #[serde(default)] - pub description: String, - pub impact: u8, - pub components: Vec, - pub start_date: DateTime, - #[serde(default)] - pub system: bool, - #[serde(rename = "type")] - pub incident_type: String, +/// Component ID cache type alias for clarity +type ComponentIdCache = HashMap<(String, Vec), u32>; + +/// Context for the reporter containing shared state and configuration. +struct ReporterContext<'a> { + req_client: &'a reqwest::Client, + config: &'a Config, + sdb_config: &'a StatusDashboardConfig, + components_url: &'a str, + incidents_url: &'a str, + headers: &'a HeaderMap, } #[tokio::main] async fn main() { - //Enable logging. + // Enable logging. tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()), @@ -76,7 +56,13 @@ async fn main() { tracing::info!("Starting cloudmon-metrics-reporter"); // Parse config. - let config = Config::new("config.yaml").unwrap(); + let config = match Config::new("config.yaml") { + Ok(cfg) => cfg, + Err(e) => { + tracing::error!("Failed to load configuration: {}", e); + return; + } + }; // Set up CTRL+C handlers. let ctrl_c = async { @@ -98,7 +84,11 @@ async fn main() { // Execute metric_watcher unless need to stop. tokio::select! { - _ = metric_watcher(&config) => {}, + result = metric_watcher(&config) => { + if let Err(e) = result { + tracing::error!("Metric watcher failed: {}", e); + } + }, _ = ctrl_c => {}, _ = terminate => {}, } @@ -122,21 +112,23 @@ async fn fetch_components( /// # Arguments: /// * `req_client` - The reqwest client; /// * `components_url` - The URL to fetch components from; -/// * `with_retry` - If true, it will retry fetching up to 3 times on failure. +/// * `retry_count` - Number of retry attempts (0 means no retries); +/// * `retry_delay` - Delay between retries in seconds. async fn update_component_cache( req_client: &reqwest::Client, components_url: &str, - with_retry: bool, -) -> Result), u32>> { + retry_count: u32, + retry_delay: u64, +) -> Result { tracing::info!("Updating component cache..."); - let fetch_future = if with_retry { - fetch_components_with_retry(req_client, components_url).await + let fetch_result = if retry_count > 0 { + fetch_components_with_retry(req_client, components_url, retry_count, retry_delay).await } else { fetch_components(req_client, components_url).await.ok() }; - match fetch_future { + match fetch_result { Some(components) if !components.is_empty() => { let cache = build_component_id_cache(components); tracing::info!( @@ -146,16 +138,18 @@ async fn update_component_cache( Ok(cache) } Some(_) => { - // Components list is empty anyhow::bail!("Component list from status-dashboard is empty.") } None => anyhow::bail!("Failed to fetch component list from status-dashboard."), } } + /// Fetches components with a retry mechanism. async fn fetch_components_with_retry( req_client: &reqwest::Client, components_url: &str, + max_attempts: u32, + retry_delay: u64, ) -> Option> { let mut attempts = 0; loop { @@ -166,22 +160,28 @@ async fn fetch_components_with_retry( } Err(e) => { attempts += 1; - tracing::error!("Failed to fetch components (attempt {}/3): {}", attempts, e); - if attempts >= 3 { - tracing::error!("Could not fetch components after 3 attempts. Giving up."); + tracing::error!( + "Failed to fetch components (attempt {}/{}): {}", + attempts, + max_attempts, + e + ); + if attempts >= max_attempts { + tracing::error!( + "Could not fetch components after {} attempts. Giving up.", + max_attempts + ); return None; } - tracing::info!("Retrying in 60 seconds..."); - sleep(Duration::from_secs(60)).await; + tracing::info!("Retrying in {} seconds...", retry_delay); + sleep(Duration::from_secs(retry_delay)).await; } } } } /// Builds a cache mapping (ComponentName, Attributes) -> ComponentID. -fn build_component_id_cache( - components: Vec, -) -> HashMap<(String, Vec), u32> { +fn build_component_id_cache(components: Vec) -> ComponentIdCache { components .into_iter() .map(|c| { @@ -192,272 +192,346 @@ fn build_component_id_cache( .collect() } -async fn metric_watcher(config: &Config) { - tracing::info!("Starting metric reporter thread"); - // Init reqwest client. - let req_client: reqwest::Client = ClientBuilder::new() - .timeout(Duration::from_secs(10 as u64)) - .build() - .unwrap(); - - // This is the logic to build a component lookup table from config. +/// Builds a component lookup table from config. +fn build_components_from_config(config: &Config) -> HashMap> { let mut components_from_config: HashMap> = HashMap::new(); + for env in config.environments.iter() { let comp_env_entry = components_from_config .entry(env.name.clone()) - .or_insert(HashMap::new()); - let mut env_attrs: Vec = Vec::new(); - if let Some(ref attrs) = env.attributes { - for (key, val) in attrs.iter() { - env_attrs.push(ComponentAttribute { - name: key.to_string(), - value: val.clone(), - }); + .or_default(); + + let env_attrs: Vec = env + .attributes + .as_ref() + .map(|attrs| { + attrs + .iter() + .map(|(key, val)| ComponentAttribute { + name: key.to_string(), + value: val.clone(), + }) + .collect() + }) + .unwrap_or_default(); + + for (service_name, health_def) in config.health_metrics.iter() { + if let Some(ref name) = health_def.component_name { + comp_env_entry.insert( + service_name.clone(), + Component { + name: name.clone(), + attributes: env_attrs.clone(), + }, + ); + } else { + tracing::warn!("No component_name is given for {}", health_def.service); } } + } - for component_def in config.health_metrics.iter() { - match component_def.1.component_name { - Some(ref name) => { - comp_env_entry.insert( - component_def.0.clone(), - Component { - name: name.clone(), - attributes: env_attrs.clone(), - }, - ); - } - None => { - tracing::warn!("No component_name is given for {}", component_def.1.service); - } + components_from_config +} + +/// Creates authorisation headers with JWT token if secret is provided. +fn create_auth_headers(secret: Option<&String>) -> Result { + let mut headers = HeaderMap::new(); + + if let Some(secret) = secret { + let key: Hmac = Hmac::new_from_slice(secret.as_bytes()) + .context("Failed to create HMAC key from secret")?; + let mut claims = BTreeMap::new(); + claims.insert("stackmon", "dummy"); + let token_str = claims + .sign_with_key(&key) + .context("Failed to sign JWT token")?; + let bearer = format!("Bearer {}", token_str); + headers.insert( + AUTHORIZATION, + bearer + .parse() + .context("Failed to parse authorization header")?, + ); + } + + Ok(headers) +} + +/// Finds component ID in the cache by name and required attributes. +fn find_component_id( + cache: &ComponentIdCache, + component_name: &str, + required_attrs: &[ComponentAttribute], +) -> Option { + cache + .iter() + .find(|((name, attrs), _id)| { + name == component_name && required_attrs.iter().all(|r| attrs.contains(r)) + }) + .map(|(_, id)| *id) +} + +/// Queries the health API for a specific environment and service. +async fn query_health_api( + req_client: &reqwest::Client, + port: u16, + env_name: &str, + service_name: &str, + query_from: &str, + query_to: &str, +) -> Result { + let response = req_client + .get(format!("http://localhost:{}/api/v1/health", port)) + .query(&[ + ("environment", env_name), + ("service", service_name), + ("from", query_from), + ("to", query_to), + ]) + .send() + .await + .context("Failed to send health API request")?; + + if response.status().is_client_error() { + let error_text = response.text().await.unwrap_or_default(); + anyhow::bail!("Health API client error: {}", error_text); + } + + response + .json::() + .await + .context("Failed to parse health API response") +} + +/// Reports an incident to the Status Dashboard API. +async fn report_incident( + req_client: &reqwest::Client, + incidents_url: &str, + headers: &HeaderMap, + incident: &IncidentData, +) -> Result<()> { + tracing::debug!("IncidentData body: {:?}", incident); + + let response = req_client + .post(incidents_url) + .headers(headers.clone()) + .json(incident) + .send() + .await + .context("Failed to send incident report request")?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response.text().await.unwrap_or_default(); + anyhow::bail!("Error reporting incident: [{}] {}", status, error_text); + } + + Ok(()) +} + +/// Processes a single health metric for a component. +async fn process_health_metric( + ctx: &ReporterContext<'_>, + env_name: &str, + service_name: &str, + component: &Component, + component_id_cache: &mut ComponentIdCache, +) -> Result<()> { + let mut data = query_health_api( + ctx.req_client, + ctx.config.server.port, + env_name, + service_name, + &ctx.sdb_config.query_from, + &ctx.sdb_config.query_to, + ) + .await?; + + let Some(last) = data.metrics.pop() else { + return Ok(()); // No metrics available + }; + + if last.value == 0 { + return Ok(()); // Status OK, nothing to report + } + + let shifted_date = Utc + .timestamp_opt(last.ts as i64, 0) + .single() + .map(|ts| ts - chrono::Duration::seconds(1)) + .unwrap_or_else(|| Utc::now() - chrono::Duration::seconds(1)); + + // Get metric names for logging + let metric_names = ctx + .config + .health_metrics + .get(service_name) + .map(|h| h.metrics.clone()) + .unwrap_or_default(); + + // Combined JSON log + let log_obj = json!({ + "timestamp": shifted_date.to_rfc3339(), + "status": last.value, + "service": service_name, + "environment": env_name, + "configured_metrics": metric_names, + "triggered_metrics": last.triggered, + "metric_value": last.metric_value, + "component": { + "name": component.name, + "attributes": component.attributes, + } + }); + tracing::info!("{}", log_obj.to_string()); + + // Search for component ID in the cache using name and attributes + let mut sorted_attrs = component.attributes.clone(); + sorted_attrs.sort(); + + // First attempt to find Component + let mut component_id = find_component_id(component_id_cache, &component.name, &sorted_attrs); + + // If component not found, refresh cache and try again + if component_id.is_none() { + tracing::info!( + "Component '{}' with attributes {:?} not found in cache. Attempting to refresh.", + component.name, + component.attributes + ); + + match update_component_cache(ctx.req_client, ctx.components_url, 0, 0).await { + Ok(new_cache) => { + *component_id_cache = new_cache; + component_id = + find_component_id(component_id_cache, &component.name, &sorted_attrs); + } + Err(e) => { + tracing::warn!( + "Failed to refresh component cache, using old one. Error: {}", + e + ); } } } + let Some(id) = component_id else { + tracing::error!( + "Component with name '{}' and attributes {:?} still not found in status-dashboard cache after refresh.", + component.name, + component.attributes + ); + return Ok(()); + }; + + tracing::info!("Found component ID {} in cache.", id); + + // Build IncidentData body for API v2 + let incident = IncidentData { + title: ctx.sdb_config.incident_title.clone(), + description: ctx.sdb_config.incident_description.clone(), + impact: last.value, + components: vec![id], + start_date: shifted_date, + system: true, + incident_type: "incident".to_string(), + }; + + match report_incident(ctx.req_client, ctx.incidents_url, ctx.headers, &incident).await { + Ok(()) => { + tracing::info!( + "Successfully reported incident for component '{}' with attributes {:?}.", + component.name, + component.attributes + ); + } + Err(e) => { + tracing::error!("Error reporting incident: {}", e); + } + } + + Ok(()) +} + +async fn metric_watcher(config: &Config) -> Result<()> { + tracing::info!("Starting metric reporter thread"); + let sdb_config = config .status_dashboard .as_ref() - .expect("Status dashboard section is missing"); + .context("Status dashboard section is missing")?; - // Fetch components from Status Dashboard and build a cache to resolve component name to ID. - let components_url = format!("{}/v2/components", sdb_config.url.clone()); - let mut component_id_cache = - match update_component_cache(&req_client, &components_url, true).await { - Ok(cache) => cache, - Err(e) => { - tracing::error!( - "Initial component cache load failed: {}. Reporter cannot proceed.", - e - ); - return; - } - }; + // Init reqwest client with configurable timeout + let req_client = ClientBuilder::new() + .timeout(Duration::from_secs(sdb_config.timeout)) + .build() + .context("Failed to build HTTP client")?; + + // Build component lookup table from config + let components_from_config = build_components_from_config(config); + + // Fetch components from Status Dashboard and build a cache to resolve component name to ID + let components_url = format!("{}/v2/components", sdb_config.url); + let mut component_id_cache = update_component_cache( + &req_client, + &components_url, + sdb_config.retry_count, + sdb_config.retry_delay, + ) + .await + .context("Initial component cache load failed. Reporter cannot proceed.")?; // Prepare for incident reporting - let incidents_url = format!("{}/v2/incidents", sdb_config.url.clone()); - let mut headers = HeaderMap::new(); - if let Some(ref secret) = sdb_config.secret { - let key: Hmac = Hmac::new_from_slice(secret.as_bytes()).unwrap(); - let mut claims = BTreeMap::new(); - claims.insert("stackmon", "dummy"); - let token_str = claims.sign_with_key(&key).unwrap(); - let bearer = format!("Bearer {}", token_str); - headers.insert(AUTHORIZATION, bearer.parse().unwrap()); - } + let incidents_url = format!("{}/v2/incidents", sdb_config.url); + let headers = create_auth_headers(sdb_config.secret.as_ref())?; + + // Create context for passing to process_health_metric + let ctx = ReporterContext { + req_client: &req_client, + config, + sdb_config, + components_url: &components_url, + incidents_url: &incidents_url, + headers: &headers, + }; + loop { - // For every env from config. + // For every env from config for env in config.environments.iter() { tracing::trace!("env {:?}", env); - // For every component (health_metric service). - for component_def in config.health_metrics.iter() { - tracing::trace!("Component {:?}", component_def.0); - // Query metric-convertor for the status - match req_client - .get(format!( - "http://localhost:{}/api/v1/health", - config.server.port - )) - // Query env/service for time [-2min..-1min] - .query(&[ - ("environment", env.name.clone()), - ("service", component_def.0.clone()), - ("from", "-5min".to_string()), - ("to", "-2min".to_string()), - ]) - .send() - .await + + let Some(env_components) = components_from_config.get(&env.name) else { + continue; + }; + + // For every component (health_metric service) + for (service_name, _component_def) in config.health_metrics.iter() { + tracing::trace!("Component {:?}", service_name); + + let Some(component) = env_components.get(service_name) else { + continue; + }; + + if let Err(e) = process_health_metric( + &ctx, + &env.name, + service_name, + component, + &mut component_id_cache, + ) + .await { - Ok(rsp) => { - if rsp.status().is_client_error() { - tracing::error!("Got API error {:?}", rsp.text().await); - } else { - // Try to parse response. - match rsp.json::().await { - Ok(mut data) => { - tracing::debug!("response {:?}", data); - // Peek at last metric in the vector - if let Some(last) = data.metrics.pop() { - if last.value > 0 { - // 0 means OK - let shifted_date = Utc - .timestamp_opt(last.ts as i64, 0) - .single() - .map(|ts| ts - chrono::Duration::seconds(1)) - .unwrap_or_else(|| { - Utc::now() - chrono::Duration::seconds(1) - }); - - let component = components_from_config - .get(&env.name) - .unwrap() - .get(component_def.0) - .unwrap(); - - // Try to find metric list for this component. - let metric_names = - match config.health_metrics.get(component_def.0) { - Some(h) => h.metrics.clone(), - None => Vec::new(), - }; - - // Сombined JSON log. - let log_obj = json!({ - "timestamp": shifted_date.to_rfc3339(), - "status": last.value, - "service": component_def.0, - "environment": env.name, - "configured_metrics": metric_names, - "triggered_metrics": last.triggered, - "metric_value": last.metric_value, - "component": { - "name": component.name, - "attributes": component.attributes, - } - }); - - tracing::info!("{}", log_obj.to_string()); - - // Search for component ID in the cache using name and attributes - let mut search_attrs = component.attributes.clone(); - search_attrs.sort(); - let mut required_attrs = component.attributes.clone(); - required_attrs.sort(); - - let find_id = - |cache: &HashMap< - (String, Vec), - u32, - >| { - cache - .iter() - .find(|((name, attrs), _id)| { - if name != &component.name { - return false; - } - required_attrs - .iter() - .all(|r| attrs.contains(r)) - }) - .map(|(_, id)| *id) - }; - - // First attemption to find Component - let mut component_id = find_id(&component_id_cache); - - // If component not found, refresh cache and try again - if component_id.is_none() { - tracing::info!( - "Component '{}' with attributes {:?} not found in cache. Attempting to refresh.", - component.name, component.attributes - ); - - match update_component_cache( - &req_client, - &components_url, - false, - ) - .await - { - Ok(new_cache) => { - component_id_cache = new_cache; - component_id = find_id(&component_id_cache); - } - Err(e) => { - tracing::warn!("Failed to refresh component cache, using old one. Error: {}", e); - } - } - } - - if let Some(id) = component_id { - tracing::info!( - "Found component ID {} in cache.", - id - ); - - // Build IncidentData body for API v2 - let body = IncidentData { - title: "System incident from monitoring system" - .to_string(), - description: "System-wide incident affecting multiple components. Created automatically." - .to_string(), - impact: last.value, - components: vec![id], - start_date: shifted_date, - system: true, - incident_type: "incident".to_string(), - }; - tracing::debug!("IncidentData body: {:?}", body); - let res = req_client - .post(&incidents_url) - .headers(headers.clone()) - .json(&body) - .send() - .await; - match res { - Ok(rsp) => { - if !rsp.status().is_success() { - tracing::error!( - "Error reporting incident: [{}] {:?}", - rsp.status(), - rsp.text().await - ); - } else { - tracing::info!( - "Successfully reported incident for component '{}' with attributes {:?}.", - component.name, - component.attributes - ); - } - } - Err(e) => { - tracing::error!( - "Error during sending post request for incident: {}", - e - ); - } - } - } else { - tracing::error!( - "Component with name '{}' and attributes {:?} still not found in status-dashboard cache after refresh.", - component.name, component.attributes - ); - } - } - } - } - Err(e) => { - tracing::error!("Cannot process response: {}", e); - } - } - } - } - Err(e) => { - tracing::error!("Error: {}", e); - } + tracing::error!( + "Error processing health metric for {}/{}: {}", + env.name, + service_name, + e + ); } } } - // Sleep for some time - sleep(Duration::from_secs(60)).await; + + // Sleep for configurable interval + sleep(Duration::from_secs(sdb_config.poll_interval)).await; } } diff --git a/src/config.rs b/src/config.rs index 5a28f09..86a134c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -173,6 +173,62 @@ pub struct StatusDashboardConfig { pub url: String, /// JWT token signature secret pub secret: Option, + /// Polling interval in seconds (default: 60) + #[serde(default = "default_poll_interval")] + pub poll_interval: u64, + /// Number of retry attempts for fetching components (default: 3) + #[serde(default = "default_retry_count")] + pub retry_count: u32, + /// Delay between retries in seconds (default: 60) + #[serde(default = "default_retry_delay")] + pub retry_delay: u64, + /// HTTP request timeout in seconds (default: 10) + #[serde(default = "default_sdb_timeout")] + pub timeout: u64, + /// Query time range start (default: "-5min") + #[serde(default = "default_query_from")] + pub query_from: String, + /// Query time range end (default: "-2min") + #[serde(default = "default_query_to")] + pub query_to: String, + /// Incident title template (default: "System incident from monitoring system") + #[serde(default = "default_incident_title")] + pub incident_title: String, + /// Incident description template (default: "System-wide incident affecting multiple components. Created automatically.") + #[serde(default = "default_incident_description")] + pub incident_description: String, +} + +fn default_poll_interval() -> u64 { + 60 +} + +fn default_retry_count() -> u32 { + 3 +} + +fn default_retry_delay() -> u64 { + 60 +} + +fn default_sdb_timeout() -> u64 { + 10 +} + +fn default_query_from() -> String { + "-5min".to_string() +} + +fn default_query_to() -> String { + "-2min".to_string() +} + +fn default_incident_title() -> String { + "System incident from monitoring system".to_string() +} + +fn default_incident_description() -> String { + "System-wide incident affecting multiple components. Created automatically.".to_string() } #[cfg(test)] diff --git a/src/types.rs b/src/types.rs index e5c7263..3ac3cb3 100644 --- a/src/types.rs +++ b/src/types.rs @@ -2,6 +2,7 @@ //! //! Internal types definitions use crate::config::Config; +use chrono::{DateTime, Utc}; use new_string_template::template::Template; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -12,6 +13,44 @@ use std::time::Duration; use reqwest::ClientBuilder; +/// Component attribute for Status Dashboard +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)] +pub struct ComponentAttribute { + pub name: String, + pub value: String, +} + +/// Component definition with name and attributes +#[derive(Clone, Deserialize, Serialize, Debug)] +pub struct Component { + pub name: String, + pub attributes: Vec, +} + +/// Structure for deserializing components from Status Dashboard API v2 (/v2/components). +#[derive(Clone, Deserialize, Serialize, Debug)] +pub struct StatusDashboardComponent { + pub id: u32, + pub name: String, + #[serde(default)] + pub attributes: Vec, +} + +/// Structure for serializing incident data for Status Dashboard API v2 (/v2/incidents). +#[derive(Clone, Deserialize, Serialize, Debug)] +pub struct IncidentData { + pub title: String, + #[serde(default)] + pub description: String, + pub impact: u8, + pub components: Vec, + pub start_date: DateTime, + #[serde(default)] + pub system: bool, + #[serde(rename = "type")] + pub incident_type: String, +} + #[derive(Clone, Debug, Deserialize, PartialEq)] #[serde(rename_all = "lowercase")] pub enum CmpType { From 10678c42b486673b51d9924f09ef011d85fe49b6 Mon Sep 17 00:00:00 2001 From: Sergei Martynov Date: Tue, 20 Jan 2026 15:50:01 +0100 Subject: [PATCH 2/5] fix linter issues --- src/api/v1.rs | 6 ++-- src/common.rs | 27 +++++++++-------- src/graphite.rs | 81 ++++++++++++++++++++++++++++--------------------- src/types.rs | 18 +++++------ 4 files changed, 73 insertions(+), 59 deletions(-) diff --git a/src/api/v1.rs b/src/api/v1.rs index b6bd381..a5637e9 100644 --- a/src/api/v1.rs +++ b/src/api/v1.rs @@ -43,15 +43,15 @@ pub struct ServiceHealthResponse { /// Construct supported api v1 routes pub fn get_v1_routes() -> Router { - return Router::new() + Router::new() .route("/", get(root)) .route("/info", get(info)) - .route("/health", get(handler_health)); + .route("/health", get(handler_health)) } /// Return API v1 root info async fn root() -> impl IntoResponse { - return (StatusCode::OK, Json(json!({"name": "v1"}))); + (StatusCode::OK, Json(json!({"name": "v1"}))) } /// Return v1 API infos diff --git a/src/common.rs b/src/common.rs index 7c14410..887b9bc 100644 --- a/src/common.rs +++ b/src/common.rs @@ -12,14 +12,14 @@ use crate::graphite; /// Get Flag value for the metric pub fn get_metric_flag_state(value: &Option, metric: &FlagMetric) -> bool { // Convert raw value to flag - return match *value { + match *value { Some(x) => match metric.op { CmpType::Lt => x < metric.threshold, CmpType::Gt => x > metric.threshold, CmpType::Eq => x == metric.threshold, }, None => false, - }; + } } /// Get Service Health as described by config pub async fn get_service_health( @@ -34,7 +34,7 @@ pub async fn get_service_health( return Err(CloudMonError::ServiceNotSupported); } let hm_config = state.health_metrics.get(service).unwrap(); - let metric_names: Vec = Vec::from(hm_config.metrics.clone()); + let metric_names: Vec = hm_config.metrics.clone(); tracing::trace!("Requesting metrics {:?}", metric_names); let mut graphite_targets: HashMap = HashMap::new(); @@ -57,14 +57,17 @@ pub async fn get_service_health( } } tracing::debug!("Requesting Graphite {:?}", graphite_targets); + let time_range = graphite::GraphiteTimeRange { + from: DateTime::parse_from_rfc3339(from).ok(), + from_raw: Some(from.to_string()), + to: DateTime::parse_from_rfc3339(to).ok(), + to_raw: Some(to.to_string()), + }; let raw_data: Vec = graphite::get_graphite_data( &state.req_client, - &state.config.datasource.url.as_str(), + state.config.datasource.url.as_str(), &graphite_targets, - DateTime::parse_from_rfc3339(from).ok(), - Some(from.to_string()), - DateTime::parse_from_rfc3339(to).ok(), - Some(to.to_string()), + &time_range, max_data_points, ) .await @@ -86,8 +89,8 @@ pub async fn get_service_health( // Iterate over all fetched series for (val, ts) in data_element.datapoints.iter() { // Convert raw value to flag - if let Some(_) = val { - metrics_map.entry(*ts).or_insert(HashMap::new()).insert( + if val.is_some() { + metrics_map.entry(*ts).or_default().insert( data_element.target.clone(), get_metric_flag_state(val, metric), ); @@ -115,7 +118,7 @@ pub async fn get_service_health( _ => false, }; context - .set_value(metric.replace("-", "_").into(), Value::from(xval)) + .set_value(metric.replace("-", "_"), Value::from(xval)) .unwrap(); } let mut expression_res: u8 = 0; @@ -166,5 +169,5 @@ pub async fn get_service_health( tracing::debug!("Summary data: {:?}, length={}", result, result.len()); - return Ok(result); + Ok(result) } diff --git a/src/graphite.rs b/src/graphite.rs index f165e88..c640e09 100644 --- a/src/graphite.rs +++ b/src/graphite.rs @@ -31,6 +31,19 @@ pub struct GraphiteData { pub datapoints: Vec<(Option, u32)>, } +/// Time range parameters for Graphite queries +#[derive(Debug, Default)] +pub struct GraphiteTimeRange { + /// Parsed 'from' datetime + pub from: Option>, + /// Raw 'from' string (e.g., "-5min") + pub from_raw: Option, + /// Parsed 'to' datetime + pub to: Option>, + /// Raw 'to' string (e.g., "-2min") + pub to_raw: Option, +} + #[derive(Debug, Deserialize)] pub struct MetricsQuery { pub query: String, @@ -92,14 +105,14 @@ where } pub fn get_graphite_routes() -> Router { - return Router::new() + Router::new() .route("/functions", get(handler_functions)) .route( "/metrics/find", get(handler_metrics_find_get).post(handler_metrics_find_post), ) .route("/render", get(handler_render).post(handler_render)) - .route("/tags/autoComplete/tags", get(handler_tags)); + .route("/tags/autoComplete/tags", get(handler_tags)) } /// Handler for graphite list supported functions API @@ -206,7 +219,7 @@ pub fn find_metrics(find_request: MetricsQuery, state: AppState) -> Vec } tracing::debug!("Elements {:?}", target_parts); } - return metrics; + metrics } /// POST Handler for graphite find metrics API @@ -217,13 +230,13 @@ pub async fn handler_metrics_find_post( ) -> impl IntoResponse { tracing::debug!("Processing find query={:?}", query); let metrics: Vec = find_metrics(query, state); - return ( + ( StatusCode::OK, Json(json!(metrics .into_iter() .sorted_by(|a, b| Ord::cmp(&a.text, &b.text)) .collect::>())), - ); + ) } /// GET Handler for graphite find metrics API @@ -234,13 +247,13 @@ pub async fn handler_metrics_find_get( ) -> impl IntoResponse { tracing::debug!("Processing find query={:?}", query); let metrics: Vec = find_metrics(query, state); - return ( + ( StatusCode::OK, Json(json!(metrics .into_iter() .sorted_by(|a, b| Ord::cmp(&a.text, &b.text)) .collect::>())), - ); + ) } /// Handler for graphite render API @@ -286,23 +299,23 @@ pub async fn handler_render( } } } else if let Some(metric) = state.flag_metrics.get(&metric_name) { - match metric.get(environment) { - Some(m) => { - graphite_targets.insert(metric_name.clone(), m.query.clone()); - } - _ => {} + if let Some(m) = metric.get(environment) { + graphite_targets.insert(metric_name.clone(), m.query.clone()); }; } tracing::debug!("Requesting Graphite {:?}", graphite_targets); + let time_range = GraphiteTimeRange { + from: None, + from_raw: from, + to: None, + to_raw: to, + }; match get_graphite_data( &state.req_client, - &state.config.datasource.url.as_str(), + state.config.datasource.url.as_str(), &graphite_targets, - None, - from, - None, - to, + &time_range, max_data_points, ) .await @@ -390,10 +403,7 @@ pub async fn get_graphite_data( client: &reqwest::Client, url: &str, targets: &HashMap, - from: Option>, - from_raw: Option, - to: Option>, - to_raw: Option, + time_range: &GraphiteTimeRange, max_data_points: u16, ) -> Result, CloudMonError> { // Prepare vector of query parameters @@ -403,14 +413,14 @@ pub async fn get_graphite_data( ("maxDataPoints", max_data_points.to_string()), ] .into(); - if let Some(xfrom) = from { + if let Some(xfrom) = time_range.from { query_params.push(("from", xfrom.format("%H:%M_%Y%m%d").to_string())); - } else if let Some(xfrom) = from_raw { + } else if let Some(ref xfrom) = time_range.from_raw { query_params.push(("from", xfrom.clone())); } - if let Some(xto) = to { + if let Some(xto) = time_range.to { query_params.push(("until", xto.format("%H:%M_%Y%m%d").to_string())); - } else if let Some(xto) = to_raw { + } else if let Some(ref xto) = time_range.to_raw { query_params.push(("until", xto.clone())); } query_params.extend( @@ -428,18 +438,18 @@ pub async fn get_graphite_data( Ok(rsp) => { if rsp.status().is_client_error() { tracing::error!("Error: {:?}", rsp.text().await); - return Err(CloudMonError::GraphiteError); + Err(CloudMonError::GraphiteError) } else { tracing::trace!("Status: {}", rsp.status()); tracing::trace!("Headers:\n{:#?}", rsp.headers()); match rsp.json().await { - Ok(dt) => return Ok(dt), - Err(_) => return Err(CloudMonError::GraphiteError), + Ok(dt) => Ok(dt), + Err(_) => Err(CloudMonError::GraphiteError), } } } - Err(_) => return Err(CloudMonError::GraphiteError), - }; + Err(_) => Err(CloudMonError::GraphiteError), + } } /// /// Handler for graphite tags API @@ -501,14 +511,17 @@ mod test { let to: Option> = DateTime::parse_from_rfc3339("2022-02-01T00:00:00+00:00").ok(); let max_data_points: u16 = 15; + let time_range = graphite::GraphiteTimeRange { + from, + from_raw: None, + to, + to_raw: None, + }; let _res = aw!(graphite::get_graphite_data( &_req_client, format!("{}", server.url()).as_str(), &targets, - from, - None, - to, - None, + &time_range, max_data_points, )); mock.assert(); diff --git a/src/types.rs b/src/types.rs index 3ac3cb3..3f2f29a 100644 --- a/src/types.rs +++ b/src/types.rs @@ -207,7 +207,7 @@ impl AppState { let timeout = Duration::from_secs(config.datasource.timeout as u64); Self { - config: config, + config, metric_templates: HashMap::new(), flag_metrics: HashMap::new(), req_client: ClientBuilder::new().timeout(timeout).build().unwrap(), @@ -231,19 +231,17 @@ impl AppState { let tmpl = self.metric_templates.get(&tmpl_ref.name).unwrap(); let tmpl_query = Template::new(tmpl.query.clone()).with_regex(&custom_regex); for env in metric_def.environments.iter() { - let mut raw = FlagMetric::default(); - raw.op = tmpl.op.clone(); - raw.threshold = match env.threshold { - Some(x) => x, - None => tmpl.threshold.clone(), - }; let vars: HashMap<&str, &str> = HashMap::from([ ("service", metric_def.service.as_str()), ("environment", env.name.as_str()), ]); - raw.query = tmpl_query.render(&vars).unwrap(); + let raw = FlagMetric { + op: tmpl.op.clone(), + threshold: env.threshold.unwrap_or(tmpl.threshold), + query: tmpl_query.render(&vars).unwrap(), + }; if let Some(x) = self.flag_metrics.get_mut(&metric_name) { - x.insert(env.name.clone(), raw.clone()); + x.insert(env.name.clone(), raw); } else { tracing::error!("Metric processing failed"); } @@ -276,7 +274,7 @@ impl AppState { expression = expression.replace(k, v); } int_metric.expressions.push(MetricExpressionDef { - expression: expression, + expression, weight: expr.weight, }); } From f9f5c0f891707442bd5f79605f969ec95cc155d4 Mon Sep 17 00:00:00 2001 From: Sergei Martynov Date: Tue, 20 Jan 2026 16:01:40 +0100 Subject: [PATCH 3/5] add github actions --- .github/workflows/lint.yml | 77 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 .github/workflows/lint.yml diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..bca4830 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,77 @@ +name: Lint + +on: + push: + branches: + - main + - master + pull_request: + + +env: + CARGO_TERM_COLOR: always + +jobs: + clippy: + name: Clippy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-action@stable + with: + components: clippy + + - name: Cache cargo registry + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-clippy-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-clippy- + + - name: Run Clippy + run: cargo clippy --all-targets --all-features -- -D warnings + + fmt: + name: Rustfmt + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-action@stable + with: + components: rustfmt + + - name: Check formatting + run: cargo fmt --all -- --check + test: + name: Run tests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-action@stable + + - name: Cache cargo registry + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-test-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-test- + + - name: Build + run: cargo build --verbose + + - name: Run tests + run: cargo test --verbose From 3ac33d0e46b088a9e284238046d9159ed906680b Mon Sep 17 00:00:00 2001 From: Sergei Martynov Date: Tue, 20 Jan 2026 16:04:28 +0100 Subject: [PATCH 4/5] small fixes --- .github/workflows/{lint.yml => ci.yml} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename .github/workflows/{lint.yml => ci.yml} (96%) diff --git a/.github/workflows/lint.yml b/.github/workflows/ci.yml similarity index 96% rename from .github/workflows/lint.yml rename to .github/workflows/ci.yml index bca4830..ac7b2b5 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ env: jobs: clippy: - name: Clippy + name: Clippy linter runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -38,7 +38,7 @@ jobs: run: cargo clippy --all-targets --all-features -- -D warnings fmt: - name: Rustfmt + name: Check Rust Formatting runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 From 10d930bc678c12384b203c13f187aa54400214be Mon Sep 17 00:00:00 2001 From: Sergei Martynov Date: Tue, 20 Jan 2026 16:06:36 +0100 Subject: [PATCH 5/5] small fixes --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ac7b2b5..2d4c78e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,7 @@ jobs: - uses: actions/checkout@v4 - name: Install Rust toolchain - uses: dtolnay/rust-action@stable + uses: dtolnay/rust-toolchain@stable with: components: clippy @@ -44,7 +44,7 @@ jobs: - uses: actions/checkout@v4 - name: Install Rust toolchain - uses: dtolnay/rust-action@stable + uses: dtolnay/rust-toolchain@stable with: components: rustfmt @@ -57,7 +57,7 @@ jobs: - uses: actions/checkout@v4 - name: Install Rust toolchain - uses: dtolnay/rust-action@stable + uses: dtolnay/rust-toolchain@stable - name: Cache cargo registry uses: actions/cache@v4