From 2ab5cd2ce64eefe0a1b38c0de3b98f7ab002a6c7 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 28 Jun 2026 12:47:43 +0700 Subject: [PATCH 1/6] add alert query mode metadata persist queryType for alerts: builder, code, or promql defaults old alerts to builder accepts legacy queryType=sql as code reserves old creationType keys treats builder/code as SQL modes for auth and dataset resolution. --- src/alerts/alert_enums.rs | 20 ++++++++++++++++++++ src/alerts/alert_structs.rs | 33 +++++++++++++++++++++++++++++---- src/alerts/alert_traits.rs | 3 ++- src/alerts/alert_types.rs | 20 ++++++++++++++++++-- src/alerts/mod.rs | 37 ++++++++++++++++++++++++++++++++----- 5 files changed, 101 insertions(+), 12 deletions(-) diff --git a/src/alerts/alert_enums.rs b/src/alerts/alert_enums.rs index 094cadfba..aea1f6efe 100644 --- a/src/alerts/alert_enums.rs +++ b/src/alerts/alert_enums.rs @@ -116,6 +116,26 @@ impl Display for AlertType { } } +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq, Default)] +#[serde(rename_all = "camelCase")] +pub enum AlertQueryType { + #[default] + Builder, + #[serde(alias = "sql")] + Code, + Promql, +} + +impl Display for AlertQueryType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertQueryType::Builder => write!(f, "builder"), + AlertQueryType::Code => write!(f, "code"), + AlertQueryType::Promql => write!(f, "promql"), + } + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub enum AlertOperator { diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs index 132f08d79..59e550da5 100644 --- a/src/alerts/alert_structs.rs +++ b/src/alerts/alert_structs.rs @@ -28,13 +28,14 @@ use crate::{ alerts::{ AlertError, CURRENT_ALERTS_VERSION, alert_enums::{ - AlertOperator, AlertState, AlertTask, AlertType, AlertVersion, EvalConfig, - LogicalOperator, NotificationState, Severity, WhereConfigOperator, + AlertOperator, AlertQueryType, AlertState, AlertTask, AlertType, AlertVersion, + EvalConfig, LogicalOperator, NotificationState, Severity, WhereConfigOperator, }, alert_traits::AlertTrait, target::{NotificationConfig, TARGETS}, }, metastore::metastore_traits::MetastoreObject, + parseable::PARSEABLE, query::resolve_stream_names, storage::object_storage::{alert_json_path, alert_state_json_path, mttr_json_path}, }; @@ -45,6 +46,10 @@ const RESERVED_FIELDS: &[&str] = &[ "severity", "title", "query", + "queryType", + "query_type", + "creationType", + "creation_type", "datasets", "alertType", "alert_type", @@ -282,6 +287,10 @@ pub struct AlertRequest { pub severity: Severity, pub title: String, pub query: String, + #[serde(default)] + pub query_type: AlertQueryType, + #[serde(default)] + pub datasets: Vec, pub alert_type: String, pub anomaly_config: Option, pub forecast_config: Option, @@ -326,11 +335,22 @@ impl AlertRequest { for id in &self.targets { TARGETS.get_target_by_id(id, &tenant_id).await?; } - let datasets = resolve_stream_names(&self.query)?; + let datasets = match self.query_type { + AlertQueryType::Builder | AlertQueryType::Code => resolve_stream_names(&self.query)?, + AlertQueryType::Promql => self.datasets, + }; if datasets.len() != 1 { return Err(AlertError::ValidationFailure(format!( - "Query should include only one dataset. Found: {datasets:?}" + "Alert should include only one dataset. Found: {datasets:?}" + ))); + } + if self.query_type == AlertQueryType::Promql + && !PARSEABLE.check_or_load_stream(&datasets[0], &tenant_id).await + { + return Err(AlertError::ValidationFailure(format!( + "Invalid PromQL metrics stream: {}", + datasets[0] ))); } @@ -342,6 +362,7 @@ impl AlertRequest { severity: self.severity, title: self.title, query: self.query, + query_type: self.query_type, datasets, alert_type: { match self.alert_type.as_str() { @@ -393,6 +414,8 @@ pub struct AlertConfig { pub severity: Severity, pub title: String, pub query: String, + #[serde(default)] + pub query_type: AlertQueryType, pub datasets: Vec, pub alert_type: AlertType, pub threshold_config: ThresholdConfig, @@ -420,6 +443,7 @@ pub struct AlertConfigResponse { pub severity: Severity, pub title: String, pub query: String, + pub query_type: AlertQueryType, pub datasets: Vec, pub alert_type: &'static str, pub anomaly_config: Option, @@ -466,6 +490,7 @@ impl AlertConfig { severity: self.severity, title: self.title, query: self.query, + query_type: self.query_type, datasets: self.datasets, alert_type: { match self.alert_type { diff --git a/src/alerts/alert_traits.rs b/src/alerts/alert_traits.rs index fdce7b26c..46be550a8 100644 --- a/src/alerts/alert_traits.rs +++ b/src/alerts/alert_traits.rs @@ -18,7 +18,7 @@ use crate::{ alerts::{ - AlertConfig, AlertError, AlertState, AlertType, EvalConfig, Severity, + AlertConfig, AlertError, AlertQueryType, AlertState, AlertType, EvalConfig, Severity, alert_enums::NotificationState, alert_structs::{Context, ThresholdConfig}, }, @@ -64,6 +64,7 @@ pub trait AlertTrait: Debug + Send + Sync + MetastoreObject { fn get_severity(&self) -> &Severity; fn get_title(&self) -> &str; fn get_query(&self) -> &str; + fn get_query_type(&self) -> AlertQueryType; fn get_alert_type(&self) -> &AlertType; fn get_threshold_config(&self) -> &ThresholdConfig; fn get_eval_config(&self) -> &EvalConfig; diff --git a/src/alerts/alert_types.rs b/src/alerts/alert_types.rs index 9c6c0e48d..cbc8083cb 100644 --- a/src/alerts/alert_types.rs +++ b/src/alerts/alert_types.rs @@ -27,8 +27,8 @@ use ulid::Ulid; use crate::{ alerts::{ - AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity, - ThresholdConfig, + AlertConfig, AlertError, AlertQueryType, AlertState, AlertType, AlertVersion, EvalConfig, + Severity, ThresholdConfig, alert_enums::NotificationState, alert_structs::{AlertStateEntry, GroupResult}, alert_traits::{AlertTrait, MessageCreation}, @@ -58,6 +58,8 @@ pub struct ThresholdAlert { pub severity: Severity, pub title: String, pub query: String, + #[serde(default)] + pub query_type: AlertQueryType, pub alert_type: AlertType, pub threshold_config: ThresholdConfig, pub eval_config: EvalConfig, @@ -89,6 +91,10 @@ impl MetastoreObject for ThresholdAlert { #[async_trait] impl AlertTrait for ThresholdAlert { async fn eval_alert(&self) -> Result, AlertError> { + if self.query_type == AlertQueryType::Promql { + return Err(AlertError::NotPresentInOSS("promql alerts")); + } + let time_range = extract_time_range(&self.eval_config)?; let tenant = self.tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); @@ -188,6 +194,10 @@ impl AlertTrait for ThresholdAlert { return Err(AlertError::InvalidAlertQuery("Empty query".into())); } + if self.query_type == AlertQueryType::Promql { + return Err(AlertError::NotPresentInOSS("promql alerts")); + } + let tables = resolve_stream_names(&self.query)?; if tables.is_empty() { return Err(AlertError::InvalidAlertQuery( @@ -315,6 +325,10 @@ impl AlertTrait for ThresholdAlert { &self.query } + fn get_query_type(&self) -> AlertQueryType { + self.query_type + } + fn get_severity(&self) -> &Severity { &self.severity } @@ -436,6 +450,7 @@ impl From for ThresholdAlert { severity: value.severity, title: value.title, query: value.query, + query_type: value.query_type, alert_type: value.alert_type, threshold_config: value.threshold_config, eval_config: value.eval_config, @@ -461,6 +476,7 @@ impl From for AlertConfig { severity: val.severity, title: val.title, query: val.query, + query_type: val.query_type, alert_type: val.alert_type, threshold_config: val.threshold_config, eval_config: val.eval_config, diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 1d1b90999..ac5bb6aee 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -45,8 +45,8 @@ pub mod alerts_utils; pub mod target; pub use crate::alerts::alert_enums::{ - AggregateFunction, AlertOperator, AlertState, AlertTask, AlertType, AlertVersion, EvalConfig, - LogicalOperator, NotificationState, Severity, WhereConfigOperator, + AggregateFunction, AlertOperator, AlertQueryType, AlertState, AlertTask, AlertType, + AlertVersion, EvalConfig, LogicalOperator, NotificationState, Severity, WhereConfigOperator, }; pub use crate::alerts::alert_structs::{ AlertConfig, AlertInfo, AlertRequest, AlertStateEntry, Alerts, AlertsInfo, AlertsInfoByState, @@ -61,6 +61,7 @@ use crate::metastore::MetastoreError; use crate::parseable::{DEFAULT_TENANT, PARSEABLE, StreamNotFound}; use crate::query::{QUERY_SESSION, resolve_stream_names}; use crate::rbac::map::{SessionKey, sessions}; +use crate::rbac::{Response, Users, role::Action}; use crate::sse::{SSE_HANDLER, SSEAlertInfo, SSEEvent}; use crate::storage; use crate::storage::ObjectStorageError; @@ -102,6 +103,31 @@ pub fn create_default_alerts_manager() -> Alerts { alerts } +pub async fn user_auth_for_alert_config( + session: &SessionKey, + alert: &AlertConfig, +) -> Result<(), actix_web::Error> { + match alert.query_type { + AlertQueryType::Builder | AlertQueryType::Code => user_auth_for_query(session, &alert.query).await, + AlertQueryType::Promql => { + let [dataset] = alert.datasets.as_slice() else { + return Err(actix_web::error::ErrorUnauthorized( + "User does not have access to PromQL alert stream", + )); + }; + + if Users.authorize(session.clone(), Action::Query, Some(dataset), None) + != Response::Authorized + { + return Err(actix_web::error::ErrorUnauthorized(format!( + "User does not have access to stream- {dataset}" + ))); + } + Ok(()) + } + } +} + impl AlertConfig { /// Migration function to convert v1 alerts to v2 structure pub async fn migrate_from_v1( @@ -125,6 +151,7 @@ impl AlertConfig { severity: basic_fields.severity, title: basic_fields.title, query, + query_type: AlertQueryType::Builder, datasets, alert_type: AlertType::Threshold, threshold_config, @@ -627,7 +654,7 @@ impl AlertConfig { let active_session = sessions().get_active_sessions(); let mut broadcast_to = vec![]; for (session, _, _) in active_session { - if user_auth_for_query(&session, &self.query).await.is_ok() + if user_auth_for_alert_config(&session, self).await.is_ok() && let SessionKey::SessionId(id) = &session { broadcast_to.push(*id); @@ -1193,7 +1220,7 @@ impl AlertManagerTrait for Alerts { let futures: Vec<_> = all_alerts .into_iter() .map(|alert| async { - if user_auth_for_query(&session.clone(), &alert.query) + if user_auth_for_alert_config(&session.clone(), &alert) .await .is_ok() { @@ -1214,7 +1241,7 @@ impl AlertManagerTrait for Alerts { let futures: Vec<_> = all_alerts .into_iter() .map(|alert| async { - if user_auth_for_query(&session, &alert.query).await.is_ok() { + if user_auth_for_alert_config(&session, &alert).await.is_ok() { Some(alert) } else { None From ff00f378b3b231bc3b2b8aa381c49882c55177a3 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 28 Jun 2026 18:12:58 +0700 Subject: [PATCH 2/6] fix get alert --- src/alerts/alert_enums.rs | 62 +++++++++++++++++++++++++++++++++++++ src/handlers/http/alerts.rs | 21 +++++++------ 2 files changed, 73 insertions(+), 10 deletions(-) diff --git a/src/alerts/alert_enums.rs b/src/alerts/alert_enums.rs index aea1f6efe..699eeb83e 100644 --- a/src/alerts/alert_enums.rs +++ b/src/alerts/alert_enums.rs @@ -231,6 +231,68 @@ impl Display for WhereConfigOperator { } } +#[cfg(test)] +mod tests { + use super::AlertQueryType; + + #[test] + fn alert_query_type_deserializes_supported_modes() { + assert_eq!( + serde_json::from_str::("\"builder\"").unwrap(), + AlertQueryType::Builder + ); + assert_eq!( + serde_json::from_str::("\"code\"").unwrap(), + AlertQueryType::Code + ); + assert_eq!( + serde_json::from_str::("\"promql\"").unwrap(), + AlertQueryType::Promql + ); + } + + #[test] + fn alert_query_type_accepts_legacy_sql_as_code() { + assert_eq!( + serde_json::from_str::("\"sql\"").unwrap(), + AlertQueryType::Code + ); + } + + #[test] + fn alert_query_type_rejects_unknown_mode() { + assert!(serde_json::from_str::("\"rawSql\"").is_err()); + } + + #[test] + fn alert_request_deserializes_promql_query_type() { + let request: crate::alerts::alert_structs::AlertRequest = serde_json::from_value( + serde_json::json!({ + "severity": "high", + "title": "Test alert", + "alertType": "threshold", + "queryType": "promql", + "query": "sum({\"k8s.pod.cpu.usage\"}) by (\"k8s.namespace.name\")", + "thresholdConfig": {"operator": ">", "value": -1.0}, + "evalConfig": { + "rollingWindow": { + "evalStart": "10 minutes", + "evalEnd": "now", + "evalFrequency": 10 + } + }, + "targets": [], + "notificationConfig": {"interval": 1}, + "datasets": ["azure-prod-cluster-metrics"] + }), + ) + .unwrap(); + + assert_eq!(request.query_type, AlertQueryType::Promql); + assert_eq!(request.datasets, vec!["azure-prod-cluster-metrics"]); + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub enum AggregateFunction { diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index db5ccc9aa..5781f3bd4 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -20,7 +20,7 @@ use std::{collections::HashMap, str::FromStr}; use crate::{ alerts::{ - ALERTS, AlertError, AlertState, Severity, + ALERTS, AlertError, AlertState, Severity, user_auth_for_alert_config, alert_enums::{AlertType, NotificationState}, alert_structs::{AlertConfig, AlertRequest, AlertStateEntry, NotificationStateRequest}, alert_traits::AlertTrait, @@ -29,7 +29,7 @@ use crate::{ }, metastore::metastore_traits::MetastoreObject, parseable::PARSEABLE, - utils::{actix::extract_session_key_from_req, get_tenant_id_from_request, user_auth_for_query}, + utils::{actix::extract_session_key_from_req, get_tenant_id_from_request}, }; use actix_web::{ HttpRequest, Responder, @@ -343,7 +343,7 @@ pub async fn get(req: HttpRequest, alert_id: Path) -> Result) -> Result Date: Sun, 28 Jun 2026 22:20:00 +0700 Subject: [PATCH 3/6] fmt fix --- src/alerts/alert_enums.rs | 9 ++++----- src/alerts/alert_structs.rs | 4 +++- src/alerts/mod.rs | 4 +++- src/handlers/http/alerts.rs | 3 ++- src/lib.rs | 2 +- src/metastore/metastore_traits.rs | 2 +- 6 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/alerts/alert_enums.rs b/src/alerts/alert_enums.rs index 699eeb83e..5824d8b84 100644 --- a/src/alerts/alert_enums.rs +++ b/src/alerts/alert_enums.rs @@ -266,8 +266,8 @@ mod tests { #[test] fn alert_request_deserializes_promql_query_type() { - let request: crate::alerts::alert_structs::AlertRequest = serde_json::from_value( - serde_json::json!({ + let request: crate::alerts::alert_structs::AlertRequest = + serde_json::from_value(serde_json::json!({ "severity": "high", "title": "Test alert", "alertType": "threshold", @@ -284,9 +284,8 @@ mod tests { "targets": [], "notificationConfig": {"interval": 1}, "datasets": ["azure-prod-cluster-metrics"] - }), - ) - .unwrap(); + })) + .unwrap(); assert_eq!(request.query_type, AlertQueryType::Promql); assert_eq!(request.datasets, vec!["azure-prod-cluster-metrics"]); diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs index 59e550da5..5fa465195 100644 --- a/src/alerts/alert_structs.rs +++ b/src/alerts/alert_structs.rs @@ -346,7 +346,9 @@ impl AlertRequest { ))); } if self.query_type == AlertQueryType::Promql - && !PARSEABLE.check_or_load_stream(&datasets[0], &tenant_id).await + && !PARSEABLE + .check_or_load_stream(&datasets[0], &tenant_id) + .await { return Err(AlertError::ValidationFailure(format!( "Invalid PromQL metrics stream: {}", diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index ac5bb6aee..d2eab58de 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -108,7 +108,9 @@ pub async fn user_auth_for_alert_config( alert: &AlertConfig, ) -> Result<(), actix_web::Error> { match alert.query_type { - AlertQueryType::Builder | AlertQueryType::Code => user_auth_for_query(session, &alert.query).await, + AlertQueryType::Builder | AlertQueryType::Code => { + user_auth_for_query(session, &alert.query).await + } AlertQueryType::Promql => { let [dataset] = alert.datasets.as_slice() else { return Err(actix_web::error::ErrorUnauthorized( diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index 5781f3bd4..6c404127c 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -20,12 +20,13 @@ use std::{collections::HashMap, str::FromStr}; use crate::{ alerts::{ - ALERTS, AlertError, AlertState, Severity, user_auth_for_alert_config, + ALERTS, AlertError, AlertState, Severity, alert_enums::{AlertType, NotificationState}, alert_structs::{AlertConfig, AlertRequest, AlertStateEntry, NotificationStateRequest}, alert_traits::AlertTrait, alert_types::ThresholdAlert, target::Retry, + user_auth_for_alert_config, }, metastore::metastore_traits::MetastoreObject, parseable::PARSEABLE, diff --git a/src/lib.rs b/src/lib.rs index 21030b7b0..94f65c034 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,8 +68,8 @@ use once_cell::sync::Lazy; pub use openid; use parseable::PARSEABLE; use reqwest::{Client, ClientBuilder}; +pub use {clap, tracing_actix_web, tracing_opentelemetry, tracing_subscriber}; pub use {opentelemetry, opentelemetry_otlp, opentelemetry_proto, opentelemetry_sdk}; -pub use {tracing_actix_web, tracing_opentelemetry, tracing_subscriber, clap}; // It is very unlikely that panic will occur when dealing with locks. pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock"; diff --git a/src/metastore/metastore_traits.rs b/src/metastore/metastore_traits.rs index 4000ccee5..9eb6d3190 100644 --- a/src/metastore/metastore_traits.rs +++ b/src/metastore/metastore_traits.rs @@ -229,7 +229,7 @@ pub trait Metastore: std::fmt::Debug + Send + Sync { stream_name: &str, get_base: bool, tenant_id: &Option, - is_migration: bool + is_migration: bool, ) -> Result; async fn put_stream_json( &self, From 805e9bc9b1d635cb296ccecc496bca0c81f9eb5a Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 28 Jun 2026 22:42:53 +0700 Subject: [PATCH 4/6] resolve comments --- src/alerts/alert_types.rs | 16 ++++++++++------ src/alerts/mod.rs | 26 +++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/alerts/alert_types.rs b/src/alerts/alert_types.rs index cbc8083cb..1230246a8 100644 --- a/src/alerts/alert_types.rs +++ b/src/alerts/alert_types.rs @@ -91,9 +91,7 @@ impl MetastoreObject for ThresholdAlert { #[async_trait] impl AlertTrait for ThresholdAlert { async fn eval_alert(&self) -> Result, AlertError> { - if self.query_type == AlertQueryType::Promql { - return Err(AlertError::NotPresentInOSS("promql alerts")); - } + self.validate_oss_query_type()?; let time_range = extract_time_range(&self.eval_config)?; @@ -194,9 +192,7 @@ impl AlertTrait for ThresholdAlert { return Err(AlertError::InvalidAlertQuery("Empty query".into())); } - if self.query_type == AlertQueryType::Promql { - return Err(AlertError::NotPresentInOSS("promql alerts")); - } + self.validate_oss_query_type()?; let tables = resolve_stream_names(&self.query)?; if tables.is_empty() { @@ -495,6 +491,14 @@ impl From for AlertConfig { } impl ThresholdAlert { + pub(crate) fn validate_oss_query_type(&self) -> Result<(), AlertError> { + if self.query_type == AlertQueryType::Promql { + return Err(AlertError::NotPresentInOSS("promql alerts")); + } + + Ok(()) + } + fn create_group_message(&self, breached_groups: &[GroupResult]) -> Result { let header = self.get_message_header()?; let mut message = format!("{header}\n"); diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index d2eab58de..197249951 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -93,6 +93,16 @@ pub async fn set_alert_manager(manager: Arc) { *ALERTS.write().await = Some(manager); } +fn ensure_schedulable_in_oss(alert: &dyn AlertTrait) -> Result<(), AlertError> { + if matches!(alert.get_alert_type(), AlertType::Threshold) + && alert.get_query_type() == AlertQueryType::Promql + { + return Err(AlertError::NotPresentInOSS("promql alerts")); + } + + Ok(()) +} + pub fn create_default_alerts_manager() -> Alerts { let (tx, rx) = mpsc::channel::(1000); let alerts = Alerts { @@ -1147,7 +1157,19 @@ impl AlertManagerTrait for Alerts { let alert: Box = match &alert.alert_type { AlertType::Threshold => { - Box::new(ThresholdAlert::from(alert)) as Box + let alert = ThresholdAlert::from(alert); + if let Err(e) = alert.validate_oss_query_type() { + warn!( + "Skipping unsupported alert task for alert {}: {e}", + alert.id + ); + map.entry(tenant_id.clone()) + .or_default() + .insert(alert.id, Box::new(alert) as Box); + continue; + } + + Box::new(alert) as Box } AlertType::Anomaly(_) => { return Err(anyhow::Error::msg( @@ -1366,6 +1388,7 @@ impl AlertManagerTrait for Alerts { .await .map_err(|e| AlertError::CustomError(e.to_string()))?; } else if should_create_task { + ensure_schedulable_in_oss(alert.as_ref())?; self.sender .send(AlertTask::Create(alert.clone_box())) .await @@ -1464,6 +1487,7 @@ impl AlertManagerTrait for Alerts { /// Start a scheduled alert task async fn start_task(&self, alert: Box) -> Result<(), AlertError> { + ensure_schedulable_in_oss(alert.as_ref())?; self.sender .send(AlertTask::Create(alert)) .await From 4cc0f7ad05e84059b21b5a06b4eb74a3b035ef77 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 28 Jun 2026 23:20:09 +0700 Subject: [PATCH 5/6] deepsource fix --- src/alerts/mod.rs | 143 +++++++++++++++++++++++----------------------- 1 file changed, 73 insertions(+), 70 deletions(-) diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 197249951..e988ded5f 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -103,6 +103,65 @@ fn ensure_schedulable_in_oss(alert: &dyn AlertTrait) -> Result<(), AlertError> { Ok(()) } +async fn parse_alert_config(alert_bytes: &[u8], tenant: &Option) -> Option { + let json_value: JsonValue = match serde_json::from_slice(alert_bytes) { + Ok(val) => val, + Err(e) => { + error!("Failed to parse alert JSON: {e}"); + return None; + } + }; + + match json_value["version"].as_str() { + Some("v1") => migrate_v1_alert(&json_value, tenant, "Failed to migrate v1 alert").await, + Some(_) if json_value["query"].is_null() || json_value.get("stream").is_some() => { + migrate_v1_alert(&json_value, tenant, "Failed to migrate v1 alert").await + } + Some(_) => match serde_json::from_value::(json_value) { + Ok(alert) => Some(alert), + Err(e) => { + error!("Failed to parse v2 alert: {e}"); + None + } + }, + None => { + warn!("Found alert without version field, assuming v1 and migrating"); + migrate_v1_alert( + &json_value, + tenant, + "Failed to migrate alert without version", + ) + .await + } + } +} + +async fn migrate_v1_alert( + json_value: &JsonValue, + tenant: &Option, + error_context: &str, +) -> Option { + match AlertConfig::migrate_from_v1(json_value, tenant).await { + Ok(migrated) => Some(migrated), + Err(e) => { + error!("{error_context}: {e}"); + None + } + } +} + +fn alert_from_config_oss(alert: AlertConfig) -> anyhow::Result> { + match &alert.alert_type { + AlertType::Threshold => Ok(Box::new(ThresholdAlert::from(alert)) as Box), + AlertType::Anomaly(_) => Err(anyhow::Error::msg( + AlertError::NotPresentInOSS("anomaly").to_string(), + )), + AlertType::Forecast(_) => Err(anyhow::Error::msg( + AlertError::NotPresentInOSS("forecast").to_string(), + )), + } +} + pub fn create_default_alerts_manager() -> Alerts { let (tx, rx) = mpsc::channel::(1000); let alerts = Alerts { @@ -1107,81 +1166,14 @@ impl AlertManagerTrait for Alerts { &Some(tenant_id.clone()) }; for alert_bytes in raw_bytes { - // First, try to parse as JSON Value to check version - let json_value: JsonValue = match serde_json::from_slice(&alert_bytes) { - Ok(val) => val, - Err(e) => { - error!("Failed to parse alert JSON: {e}"); - continue; - } - }; - - // Check version and handle migration - let mut alert = if let Some(version_str) = json_value["version"].as_str() { - if version_str == "v1" - || json_value["query"].is_null() - || json_value.get("stream").is_some() - { - // This is a v1 alert that needs migration - match AlertConfig::migrate_from_v1(&json_value, tenant).await { - Ok(migrated) => migrated, - Err(e) => { - error!("Failed to migrate v1 alert: {e}"); - continue; - } - } - } else { - // Try to parse as v2 - match serde_json::from_value::(json_value) { - Ok(alert) => alert, - Err(e) => { - error!("Failed to parse v2 alert: {e}"); - continue; - } - } - } - } else { - // No version field, assume v1 and migrate - warn!("Found alert without version field, assuming v1 and migrating"); - match AlertConfig::migrate_from_v1(&json_value, tenant).await { - Ok(migrated) => migrated, - Err(e) => { - error!("Failed to migrate alert without version: {e}"); - continue; - } - } + let Some(mut alert) = parse_alert_config(&alert_bytes, tenant).await else { + continue; }; // ensure that alert config's tenant is correctly set alert.tenant_id.clone_from(tenant); - let alert: Box = match &alert.alert_type { - AlertType::Threshold => { - let alert = ThresholdAlert::from(alert); - if let Err(e) = alert.validate_oss_query_type() { - warn!( - "Skipping unsupported alert task for alert {}: {e}", - alert.id - ); - map.entry(tenant_id.clone()) - .or_default() - .insert(alert.id, Box::new(alert) as Box); - continue; - } - - Box::new(alert) as Box - } - AlertType::Anomaly(_) => { - return Err(anyhow::Error::msg( - AlertError::NotPresentInOSS("anomaly").to_string(), - )); - } - AlertType::Forecast(_) => { - return Err(anyhow::Error::msg( - AlertError::NotPresentInOSS("forecast").to_string(), - )); - } - }; + let alert = alert_from_config_oss(alert)?; // Create alert task iff alert's state is not paused if alert.get_state().eq(&AlertState::Disabled) { @@ -1191,6 +1183,17 @@ impl AlertManagerTrait for Alerts { continue; } + if let Err(e) = ensure_schedulable_in_oss(alert.as_ref()) { + warn!( + "Skipping unsupported alert task for alert {}: {e}", + alert.get_id() + ); + map.entry(tenant_id.clone()) + .or_default() + .insert(*alert.get_id(), alert); + continue; + } + match self.sender.send(AlertTask::Create(alert.clone_box())).await { Ok(_) => {} Err(e) => { From 39d85b446635f7e88a6771b5a2f8710eb446010a Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 28 Jun 2026 23:27:19 +0700 Subject: [PATCH 6/6] skip loading anomaly or forecast alerts --- src/alerts/mod.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index e988ded5f..b7196920b 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -150,15 +150,11 @@ async fn migrate_v1_alert( } } -fn alert_from_config_oss(alert: AlertConfig) -> anyhow::Result> { +fn alert_from_config_oss(alert: AlertConfig) -> Result, AlertError> { match &alert.alert_type { AlertType::Threshold => Ok(Box::new(ThresholdAlert::from(alert)) as Box), - AlertType::Anomaly(_) => Err(anyhow::Error::msg( - AlertError::NotPresentInOSS("anomaly").to_string(), - )), - AlertType::Forecast(_) => Err(anyhow::Error::msg( - AlertError::NotPresentInOSS("forecast").to_string(), - )), + AlertType::Anomaly(_) => Err(AlertError::NotPresentInOSS("anomaly")), + AlertType::Forecast(_) => Err(AlertError::NotPresentInOSS("forecast")), } } @@ -1173,7 +1169,14 @@ impl AlertManagerTrait for Alerts { // ensure that alert config's tenant is correctly set alert.tenant_id.clone_from(tenant); - let alert = alert_from_config_oss(alert)?; + let alert = match alert_from_config_oss(alert) { + Ok(alert) => alert, + Err(e @ AlertError::NotPresentInOSS(_)) => { + warn!("Skipping unsupported OSS alert: {e}"); + continue; + } + Err(e) => return Err(anyhow::Error::msg(e.to_string())), + }; // Create alert task iff alert's state is not paused if alert.get_state().eq(&AlertState::Disabled) {