diff --git a/src/alerts/alert_enums.rs b/src/alerts/alert_enums.rs index 094cadfba..5824d8b84 100644 --- a/src/alerts/alert_enums.rs +++ b/src/alerts/alert_enums.rs @@ -116,6 +116,26 @@ impl Display for AlertType { } } +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq, Default)] +#[serde(rename_all = "camelCase")] +pub enum AlertQueryType { + #[default] + Builder, + #[serde(alias = "sql")] + Code, + Promql, +} + +impl Display for AlertQueryType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertQueryType::Builder => write!(f, "builder"), + AlertQueryType::Code => write!(f, "code"), + AlertQueryType::Promql => write!(f, "promql"), + } + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub enum AlertOperator { @@ -211,6 +231,67 @@ impl Display for WhereConfigOperator { } } +#[cfg(test)] +mod tests { + use super::AlertQueryType; + + #[test] + fn alert_query_type_deserializes_supported_modes() { + assert_eq!( + serde_json::from_str::("\"builder\"").unwrap(), + AlertQueryType::Builder + ); + assert_eq!( + serde_json::from_str::("\"code\"").unwrap(), + AlertQueryType::Code + ); + assert_eq!( + serde_json::from_str::("\"promql\"").unwrap(), + AlertQueryType::Promql + ); + } + + #[test] + fn alert_query_type_accepts_legacy_sql_as_code() { + assert_eq!( + serde_json::from_str::("\"sql\"").unwrap(), + AlertQueryType::Code + ); + } + + #[test] + fn alert_query_type_rejects_unknown_mode() { + assert!(serde_json::from_str::("\"rawSql\"").is_err()); + } + + #[test] + fn alert_request_deserializes_promql_query_type() { + let request: crate::alerts::alert_structs::AlertRequest = + serde_json::from_value(serde_json::json!({ + "severity": "high", + "title": "Test alert", + "alertType": "threshold", + "queryType": "promql", + "query": "sum({\"k8s.pod.cpu.usage\"}) by (\"k8s.namespace.name\")", + "thresholdConfig": {"operator": ">", "value": -1.0}, + "evalConfig": { + "rollingWindow": { + "evalStart": "10 minutes", + "evalEnd": "now", + "evalFrequency": 10 + } + }, + "targets": [], + "notificationConfig": {"interval": 1}, + "datasets": ["azure-prod-cluster-metrics"] + })) + .unwrap(); + + assert_eq!(request.query_type, AlertQueryType::Promql); + assert_eq!(request.datasets, vec!["azure-prod-cluster-metrics"]); + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub enum AggregateFunction { diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs index 132f08d79..5fa465195 100644 --- a/src/alerts/alert_structs.rs +++ b/src/alerts/alert_structs.rs @@ -28,13 +28,14 @@ use crate::{ alerts::{ AlertError, CURRENT_ALERTS_VERSION, alert_enums::{ - AlertOperator, AlertState, AlertTask, AlertType, AlertVersion, EvalConfig, - LogicalOperator, NotificationState, Severity, WhereConfigOperator, + AlertOperator, AlertQueryType, AlertState, AlertTask, AlertType, AlertVersion, + EvalConfig, LogicalOperator, NotificationState, Severity, WhereConfigOperator, }, alert_traits::AlertTrait, target::{NotificationConfig, TARGETS}, }, metastore::metastore_traits::MetastoreObject, + parseable::PARSEABLE, query::resolve_stream_names, storage::object_storage::{alert_json_path, alert_state_json_path, mttr_json_path}, }; @@ -45,6 +46,10 @@ const RESERVED_FIELDS: &[&str] = &[ "severity", "title", "query", + "queryType", + "query_type", + "creationType", + "creation_type", "datasets", "alertType", "alert_type", @@ -282,6 +287,10 @@ pub struct AlertRequest { pub severity: Severity, pub title: String, pub query: String, + #[serde(default)] + pub query_type: AlertQueryType, + #[serde(default)] + pub datasets: Vec, pub alert_type: String, pub anomaly_config: Option, pub forecast_config: Option, @@ -326,11 +335,24 @@ impl AlertRequest { for id in &self.targets { TARGETS.get_target_by_id(id, &tenant_id).await?; } - let datasets = resolve_stream_names(&self.query)?; + let datasets = match self.query_type { + AlertQueryType::Builder | AlertQueryType::Code => resolve_stream_names(&self.query)?, + AlertQueryType::Promql => self.datasets, + }; if datasets.len() != 1 { return Err(AlertError::ValidationFailure(format!( - "Query should include only one dataset. Found: {datasets:?}" + "Alert should include only one dataset. Found: {datasets:?}" + ))); + } + if self.query_type == AlertQueryType::Promql + && !PARSEABLE + .check_or_load_stream(&datasets[0], &tenant_id) + .await + { + return Err(AlertError::ValidationFailure(format!( + "Invalid PromQL metrics stream: {}", + datasets[0] ))); } @@ -342,6 +364,7 @@ impl AlertRequest { severity: self.severity, title: self.title, query: self.query, + query_type: self.query_type, datasets, alert_type: { match self.alert_type.as_str() { @@ -393,6 +416,8 @@ pub struct AlertConfig { pub severity: Severity, pub title: String, pub query: String, + #[serde(default)] + pub query_type: AlertQueryType, pub datasets: Vec, pub alert_type: AlertType, pub threshold_config: ThresholdConfig, @@ -420,6 +445,7 @@ pub struct AlertConfigResponse { pub severity: Severity, pub title: String, pub query: String, + pub query_type: AlertQueryType, pub datasets: Vec, pub alert_type: &'static str, pub anomaly_config: Option, @@ -466,6 +492,7 @@ impl AlertConfig { severity: self.severity, title: self.title, query: self.query, + query_type: self.query_type, datasets: self.datasets, alert_type: { match self.alert_type { diff --git a/src/alerts/alert_traits.rs b/src/alerts/alert_traits.rs index fdce7b26c..46be550a8 100644 --- a/src/alerts/alert_traits.rs +++ b/src/alerts/alert_traits.rs @@ -18,7 +18,7 @@ use crate::{ alerts::{ - AlertConfig, AlertError, AlertState, AlertType, EvalConfig, Severity, + AlertConfig, AlertError, AlertQueryType, AlertState, AlertType, EvalConfig, Severity, alert_enums::NotificationState, alert_structs::{Context, ThresholdConfig}, }, @@ -64,6 +64,7 @@ pub trait AlertTrait: Debug + Send + Sync + MetastoreObject { fn get_severity(&self) -> &Severity; fn get_title(&self) -> &str; fn get_query(&self) -> &str; + fn get_query_type(&self) -> AlertQueryType; fn get_alert_type(&self) -> &AlertType; fn get_threshold_config(&self) -> &ThresholdConfig; fn get_eval_config(&self) -> &EvalConfig; diff --git a/src/alerts/alert_types.rs b/src/alerts/alert_types.rs index 9c6c0e48d..1230246a8 100644 --- a/src/alerts/alert_types.rs +++ b/src/alerts/alert_types.rs @@ -27,8 +27,8 @@ use ulid::Ulid; use crate::{ alerts::{ - AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity, - ThresholdConfig, + AlertConfig, AlertError, AlertQueryType, AlertState, AlertType, AlertVersion, EvalConfig, + Severity, ThresholdConfig, alert_enums::NotificationState, alert_structs::{AlertStateEntry, GroupResult}, alert_traits::{AlertTrait, MessageCreation}, @@ -58,6 +58,8 @@ pub struct ThresholdAlert { pub severity: Severity, pub title: String, pub query: String, + #[serde(default)] + pub query_type: AlertQueryType, pub alert_type: AlertType, pub threshold_config: ThresholdConfig, pub eval_config: EvalConfig, @@ -89,6 +91,8 @@ impl MetastoreObject for ThresholdAlert { #[async_trait] impl AlertTrait for ThresholdAlert { async fn eval_alert(&self) -> Result, AlertError> { + self.validate_oss_query_type()?; + let time_range = extract_time_range(&self.eval_config)?; let tenant = self.tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); @@ -188,6 +192,8 @@ impl AlertTrait for ThresholdAlert { return Err(AlertError::InvalidAlertQuery("Empty query".into())); } + self.validate_oss_query_type()?; + let tables = resolve_stream_names(&self.query)?; if tables.is_empty() { return Err(AlertError::InvalidAlertQuery( @@ -315,6 +321,10 @@ impl AlertTrait for ThresholdAlert { &self.query } + fn get_query_type(&self) -> AlertQueryType { + self.query_type + } + fn get_severity(&self) -> &Severity { &self.severity } @@ -436,6 +446,7 @@ impl From for ThresholdAlert { severity: value.severity, title: value.title, query: value.query, + query_type: value.query_type, alert_type: value.alert_type, threshold_config: value.threshold_config, eval_config: value.eval_config, @@ -461,6 +472,7 @@ impl From for AlertConfig { severity: val.severity, title: val.title, query: val.query, + query_type: val.query_type, alert_type: val.alert_type, threshold_config: val.threshold_config, eval_config: val.eval_config, @@ -479,6 +491,14 @@ impl From for AlertConfig { } impl ThresholdAlert { + pub(crate) fn validate_oss_query_type(&self) -> Result<(), AlertError> { + if self.query_type == AlertQueryType::Promql { + return Err(AlertError::NotPresentInOSS("promql alerts")); + } + + Ok(()) + } + fn create_group_message(&self, breached_groups: &[GroupResult]) -> Result { let header = self.get_message_header()?; let mut message = format!("{header}\n"); diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 1d1b90999..b7196920b 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -45,8 +45,8 @@ pub mod alerts_utils; pub mod target; pub use crate::alerts::alert_enums::{ - AggregateFunction, AlertOperator, AlertState, AlertTask, AlertType, AlertVersion, EvalConfig, - LogicalOperator, NotificationState, Severity, WhereConfigOperator, + AggregateFunction, AlertOperator, AlertQueryType, AlertState, AlertTask, AlertType, + AlertVersion, EvalConfig, LogicalOperator, NotificationState, Severity, WhereConfigOperator, }; pub use crate::alerts::alert_structs::{ AlertConfig, AlertInfo, AlertRequest, AlertStateEntry, Alerts, AlertsInfo, AlertsInfoByState, @@ -61,6 +61,7 @@ use crate::metastore::MetastoreError; use crate::parseable::{DEFAULT_TENANT, PARSEABLE, StreamNotFound}; use crate::query::{QUERY_SESSION, resolve_stream_names}; use crate::rbac::map::{SessionKey, sessions}; +use crate::rbac::{Response, Users, role::Action}; use crate::sse::{SSE_HANDLER, SSEAlertInfo, SSEEvent}; use crate::storage; use crate::storage::ObjectStorageError; @@ -92,6 +93,71 @@ pub async fn set_alert_manager(manager: Arc) { *ALERTS.write().await = Some(manager); } +fn ensure_schedulable_in_oss(alert: &dyn AlertTrait) -> Result<(), AlertError> { + if matches!(alert.get_alert_type(), AlertType::Threshold) + && alert.get_query_type() == AlertQueryType::Promql + { + return Err(AlertError::NotPresentInOSS("promql alerts")); + } + + Ok(()) +} + +async fn parse_alert_config(alert_bytes: &[u8], tenant: &Option) -> Option { + let json_value: JsonValue = match serde_json::from_slice(alert_bytes) { + Ok(val) => val, + Err(e) => { + error!("Failed to parse alert JSON: {e}"); + return None; + } + }; + + match json_value["version"].as_str() { + Some("v1") => migrate_v1_alert(&json_value, tenant, "Failed to migrate v1 alert").await, + Some(_) if json_value["query"].is_null() || json_value.get("stream").is_some() => { + migrate_v1_alert(&json_value, tenant, "Failed to migrate v1 alert").await + } + Some(_) => match serde_json::from_value::(json_value) { + Ok(alert) => Some(alert), + Err(e) => { + error!("Failed to parse v2 alert: {e}"); + None + } + }, + None => { + warn!("Found alert without version field, assuming v1 and migrating"); + migrate_v1_alert( + &json_value, + tenant, + "Failed to migrate alert without version", + ) + .await + } + } +} + +async fn migrate_v1_alert( + json_value: &JsonValue, + tenant: &Option, + error_context: &str, +) -> Option { + match AlertConfig::migrate_from_v1(json_value, tenant).await { + Ok(migrated) => Some(migrated), + Err(e) => { + error!("{error_context}: {e}"); + None + } + } +} + +fn alert_from_config_oss(alert: AlertConfig) -> Result, AlertError> { + match &alert.alert_type { + AlertType::Threshold => Ok(Box::new(ThresholdAlert::from(alert)) as Box), + AlertType::Anomaly(_) => Err(AlertError::NotPresentInOSS("anomaly")), + AlertType::Forecast(_) => Err(AlertError::NotPresentInOSS("forecast")), + } +} + pub fn create_default_alerts_manager() -> Alerts { let (tx, rx) = mpsc::channel::(1000); let alerts = Alerts { @@ -102,6 +168,33 @@ pub fn create_default_alerts_manager() -> Alerts { alerts } +pub async fn user_auth_for_alert_config( + session: &SessionKey, + alert: &AlertConfig, +) -> Result<(), actix_web::Error> { + match alert.query_type { + AlertQueryType::Builder | AlertQueryType::Code => { + user_auth_for_query(session, &alert.query).await + } + AlertQueryType::Promql => { + let [dataset] = alert.datasets.as_slice() else { + return Err(actix_web::error::ErrorUnauthorized( + "User does not have access to PromQL alert stream", + )); + }; + + if Users.authorize(session.clone(), Action::Query, Some(dataset), None) + != Response::Authorized + { + return Err(actix_web::error::ErrorUnauthorized(format!( + "User does not have access to stream- {dataset}" + ))); + } + Ok(()) + } + } +} + impl AlertConfig { /// Migration function to convert v1 alerts to v2 structure pub async fn migrate_from_v1( @@ -125,6 +218,7 @@ impl AlertConfig { severity: basic_fields.severity, title: basic_fields.title, query, + query_type: AlertQueryType::Builder, datasets, alert_type: AlertType::Threshold, threshold_config, @@ -627,7 +721,7 @@ impl AlertConfig { let active_session = sessions().get_active_sessions(); let mut broadcast_to = vec![]; for (session, _, _) in active_session { - if user_auth_for_query(&session, &self.query).await.is_ok() + if user_auth_for_alert_config(&session, self).await.is_ok() && let SessionKey::SessionId(id) = &session { broadcast_to.push(*id); @@ -1068,68 +1162,20 @@ impl AlertManagerTrait for Alerts { &Some(tenant_id.clone()) }; for alert_bytes in raw_bytes { - // First, try to parse as JSON Value to check version - let json_value: JsonValue = match serde_json::from_slice(&alert_bytes) { - Ok(val) => val, - Err(e) => { - error!("Failed to parse alert JSON: {e}"); - continue; - } - }; - - // Check version and handle migration - let mut alert = if let Some(version_str) = json_value["version"].as_str() { - if version_str == "v1" - || json_value["query"].is_null() - || json_value.get("stream").is_some() - { - // This is a v1 alert that needs migration - match AlertConfig::migrate_from_v1(&json_value, tenant).await { - Ok(migrated) => migrated, - Err(e) => { - error!("Failed to migrate v1 alert: {e}"); - continue; - } - } - } else { - // Try to parse as v2 - match serde_json::from_value::(json_value) { - Ok(alert) => alert, - Err(e) => { - error!("Failed to parse v2 alert: {e}"); - continue; - } - } - } - } else { - // No version field, assume v1 and migrate - warn!("Found alert without version field, assuming v1 and migrating"); - match AlertConfig::migrate_from_v1(&json_value, tenant).await { - Ok(migrated) => migrated, - Err(e) => { - error!("Failed to migrate alert without version: {e}"); - continue; - } - } + let Some(mut alert) = parse_alert_config(&alert_bytes, tenant).await else { + continue; }; // ensure that alert config's tenant is correctly set alert.tenant_id.clone_from(tenant); - let alert: Box = match &alert.alert_type { - AlertType::Threshold => { - Box::new(ThresholdAlert::from(alert)) as Box - } - AlertType::Anomaly(_) => { - return Err(anyhow::Error::msg( - AlertError::NotPresentInOSS("anomaly").to_string(), - )); - } - AlertType::Forecast(_) => { - return Err(anyhow::Error::msg( - AlertError::NotPresentInOSS("forecast").to_string(), - )); + let alert = match alert_from_config_oss(alert) { + Ok(alert) => alert, + Err(e @ AlertError::NotPresentInOSS(_)) => { + warn!("Skipping unsupported OSS alert: {e}"); + continue; } + Err(e) => return Err(anyhow::Error::msg(e.to_string())), }; // Create alert task iff alert's state is not paused @@ -1140,6 +1186,17 @@ impl AlertManagerTrait for Alerts { continue; } + if let Err(e) = ensure_schedulable_in_oss(alert.as_ref()) { + warn!( + "Skipping unsupported alert task for alert {}: {e}", + alert.get_id() + ); + map.entry(tenant_id.clone()) + .or_default() + .insert(*alert.get_id(), alert); + continue; + } + match self.sender.send(AlertTask::Create(alert.clone_box())).await { Ok(_) => {} Err(e) => { @@ -1193,7 +1250,7 @@ impl AlertManagerTrait for Alerts { let futures: Vec<_> = all_alerts .into_iter() .map(|alert| async { - if user_auth_for_query(&session.clone(), &alert.query) + if user_auth_for_alert_config(&session.clone(), &alert) .await .is_ok() { @@ -1214,7 +1271,7 @@ impl AlertManagerTrait for Alerts { let futures: Vec<_> = all_alerts .into_iter() .map(|alert| async { - if user_auth_for_query(&session, &alert.query).await.is_ok() { + if user_auth_for_alert_config(&session, &alert).await.is_ok() { Some(alert) } else { None @@ -1337,6 +1394,7 @@ impl AlertManagerTrait for Alerts { .await .map_err(|e| AlertError::CustomError(e.to_string()))?; } else if should_create_task { + ensure_schedulable_in_oss(alert.as_ref())?; self.sender .send(AlertTask::Create(alert.clone_box())) .await @@ -1435,6 +1493,7 @@ impl AlertManagerTrait for Alerts { /// Start a scheduled alert task async fn start_task(&self, alert: Box) -> Result<(), AlertError> { + ensure_schedulable_in_oss(alert.as_ref())?; self.sender .send(AlertTask::Create(alert)) .await diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index db5ccc9aa..6c404127c 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -26,10 +26,11 @@ use crate::{ alert_traits::AlertTrait, alert_types::ThresholdAlert, target::Retry, + user_auth_for_alert_config, }, metastore::metastore_traits::MetastoreObject, parseable::PARSEABLE, - utils::{actix::extract_session_key_from_req, get_tenant_id_from_request, user_auth_for_query}, + utils::{actix::extract_session_key_from_req, get_tenant_id_from_request}, }; use actix_web::{ HttpRequest, Responder, @@ -343,7 +344,7 @@ pub async fn get(req: HttpRequest, alert_id: Path) -> Result) -> Result, - is_migration: bool + is_migration: bool, ) -> Result; async fn put_stream_json( &self,