Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions src/alerts/alert_enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ impl Display for AlertType {
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq, Default)]
#[serde(rename_all = "camelCase")]
pub enum AlertQueryType {
#[default]
Builder,
#[serde(alias = "sql")]
Code,
Promql,
}

impl Display for AlertQueryType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AlertQueryType::Builder => write!(f, "builder"),
AlertQueryType::Code => write!(f, "code"),
AlertQueryType::Promql => write!(f, "promql"),
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub enum AlertOperator {
Expand Down Expand Up @@ -211,6 +231,67 @@ impl Display for WhereConfigOperator {
}
}

#[cfg(test)]
mod tests {
use super::AlertQueryType;

#[test]
fn alert_query_type_deserializes_supported_modes() {
assert_eq!(
serde_json::from_str::<AlertQueryType>("\"builder\"").unwrap(),
AlertQueryType::Builder
);
assert_eq!(
serde_json::from_str::<AlertQueryType>("\"code\"").unwrap(),
AlertQueryType::Code
);
assert_eq!(
serde_json::from_str::<AlertQueryType>("\"promql\"").unwrap(),
AlertQueryType::Promql
);
}

#[test]
fn alert_query_type_accepts_legacy_sql_as_code() {
assert_eq!(
serde_json::from_str::<AlertQueryType>("\"sql\"").unwrap(),
AlertQueryType::Code
);
}

#[test]
fn alert_query_type_rejects_unknown_mode() {
assert!(serde_json::from_str::<AlertQueryType>("\"rawSql\"").is_err());
}

#[test]
fn alert_request_deserializes_promql_query_type() {
let request: crate::alerts::alert_structs::AlertRequest =
serde_json::from_value(serde_json::json!({
"severity": "high",
"title": "Test alert",
"alertType": "threshold",
"queryType": "promql",
"query": "sum({\"k8s.pod.cpu.usage\"}) by (\"k8s.namespace.name\")",
"thresholdConfig": {"operator": ">", "value": -1.0},
"evalConfig": {
"rollingWindow": {
"evalStart": "10 minutes",
"evalEnd": "now",
"evalFrequency": 10
}
},
"targets": [],
"notificationConfig": {"interval": 1},
"datasets": ["azure-prod-cluster-metrics"]
}))
.unwrap();

assert_eq!(request.query_type, AlertQueryType::Promql);
assert_eq!(request.datasets, vec!["azure-prod-cluster-metrics"]);
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub enum AggregateFunction {
Expand Down
35 changes: 31 additions & 4 deletions src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ use crate::{
alerts::{
AlertError, CURRENT_ALERTS_VERSION,
alert_enums::{
AlertOperator, AlertState, AlertTask, AlertType, AlertVersion, EvalConfig,
LogicalOperator, NotificationState, Severity, WhereConfigOperator,
AlertOperator, AlertQueryType, AlertState, AlertTask, AlertType, AlertVersion,
EvalConfig, LogicalOperator, NotificationState, Severity, WhereConfigOperator,
},
alert_traits::AlertTrait,
target::{NotificationConfig, TARGETS},
},
metastore::metastore_traits::MetastoreObject,
parseable::PARSEABLE,
query::resolve_stream_names,
storage::object_storage::{alert_json_path, alert_state_json_path, mttr_json_path},
};
Expand All @@ -45,6 +46,10 @@ const RESERVED_FIELDS: &[&str] = &[
"severity",
"title",
"query",
"queryType",
"query_type",
"creationType",
"creation_type",
"datasets",
"alertType",
"alert_type",
Expand Down Expand Up @@ -282,6 +287,10 @@ pub struct AlertRequest {
pub severity: Severity,
pub title: String,
pub query: String,
#[serde(default)]
pub query_type: AlertQueryType,
#[serde(default)]
pub datasets: Vec<String>,
pub alert_type: String,
pub anomaly_config: Option<AnomalyConfig>,
pub forecast_config: Option<ForecastConfig>,
Expand Down Expand Up @@ -326,11 +335,24 @@ impl AlertRequest {
for id in &self.targets {
TARGETS.get_target_by_id(id, &tenant_id).await?;
}
let datasets = resolve_stream_names(&self.query)?;
let datasets = match self.query_type {
AlertQueryType::Builder | AlertQueryType::Code => resolve_stream_names(&self.query)?,
AlertQueryType::Promql => self.datasets,
};

if datasets.len() != 1 {
return Err(AlertError::ValidationFailure(format!(
"Query should include only one dataset. Found: {datasets:?}"
"Alert should include only one dataset. Found: {datasets:?}"
)));
}
if self.query_type == AlertQueryType::Promql
&& !PARSEABLE
.check_or_load_stream(&datasets[0], &tenant_id)
.await
{
return Err(AlertError::ValidationFailure(format!(
"Invalid PromQL metrics stream: {}",
datasets[0]
)));
}

Expand All @@ -342,6 +364,7 @@ impl AlertRequest {
severity: self.severity,
title: self.title,
query: self.query,
query_type: self.query_type,
datasets,
alert_type: {
match self.alert_type.as_str() {
Expand Down Expand Up @@ -393,6 +416,8 @@ pub struct AlertConfig {
pub severity: Severity,
pub title: String,
pub query: String,
#[serde(default)]
pub query_type: AlertQueryType,
pub datasets: Vec<String>,
pub alert_type: AlertType,
pub threshold_config: ThresholdConfig,
Expand Down Expand Up @@ -420,6 +445,7 @@ pub struct AlertConfigResponse {
pub severity: Severity,
pub title: String,
pub query: String,
pub query_type: AlertQueryType,
pub datasets: Vec<String>,
pub alert_type: &'static str,
pub anomaly_config: Option<AnomalyConfig>,
Expand Down Expand Up @@ -466,6 +492,7 @@ impl AlertConfig {
severity: self.severity,
title: self.title,
query: self.query,
query_type: self.query_type,
datasets: self.datasets,
alert_type: {
match self.alert_type {
Expand Down
3 changes: 2 additions & 1 deletion src/alerts/alert_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::{
alerts::{
AlertConfig, AlertError, AlertState, AlertType, EvalConfig, Severity,
AlertConfig, AlertError, AlertQueryType, AlertState, AlertType, EvalConfig, Severity,
alert_enums::NotificationState,
alert_structs::{Context, ThresholdConfig},
},
Expand Down Expand Up @@ -64,6 +64,7 @@ pub trait AlertTrait: Debug + Send + Sync + MetastoreObject {
fn get_severity(&self) -> &Severity;
fn get_title(&self) -> &str;
fn get_query(&self) -> &str;
fn get_query_type(&self) -> AlertQueryType;
fn get_alert_type(&self) -> &AlertType;
fn get_threshold_config(&self) -> &ThresholdConfig;
fn get_eval_config(&self) -> &EvalConfig;
Expand Down
24 changes: 22 additions & 2 deletions src/alerts/alert_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use ulid::Ulid;

use crate::{
alerts::{
AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity,
ThresholdConfig,
AlertConfig, AlertError, AlertQueryType, AlertState, AlertType, AlertVersion, EvalConfig,
Severity, ThresholdConfig,
alert_enums::NotificationState,
alert_structs::{AlertStateEntry, GroupResult},
alert_traits::{AlertTrait, MessageCreation},
Expand Down Expand Up @@ -58,6 +58,8 @@ pub struct ThresholdAlert {
pub severity: Severity,
pub title: String,
pub query: String,
#[serde(default)]
pub query_type: AlertQueryType,
pub alert_type: AlertType,
pub threshold_config: ThresholdConfig,
pub eval_config: EvalConfig,
Expand Down Expand Up @@ -89,6 +91,8 @@ impl MetastoreObject for ThresholdAlert {
#[async_trait]
impl AlertTrait for ThresholdAlert {
async fn eval_alert(&self) -> Result<Option<String>, AlertError> {
self.validate_oss_query_type()?;

let time_range = extract_time_range(&self.eval_config)?;

let tenant = self.tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
Expand Down Expand Up @@ -188,6 +192,8 @@ impl AlertTrait for ThresholdAlert {
return Err(AlertError::InvalidAlertQuery("Empty query".into()));
}

self.validate_oss_query_type()?;

let tables = resolve_stream_names(&self.query)?;
if tables.is_empty() {
return Err(AlertError::InvalidAlertQuery(
Expand Down Expand Up @@ -315,6 +321,10 @@ impl AlertTrait for ThresholdAlert {
&self.query
}

fn get_query_type(&self) -> AlertQueryType {
self.query_type
}

fn get_severity(&self) -> &Severity {
&self.severity
}
Expand Down Expand Up @@ -436,6 +446,7 @@ impl From<AlertConfig> for ThresholdAlert {
severity: value.severity,
title: value.title,
query: value.query,
query_type: value.query_type,
alert_type: value.alert_type,
threshold_config: value.threshold_config,
eval_config: value.eval_config,
Expand All @@ -461,6 +472,7 @@ impl From<ThresholdAlert> for AlertConfig {
severity: val.severity,
title: val.title,
query: val.query,
query_type: val.query_type,
alert_type: val.alert_type,
threshold_config: val.threshold_config,
eval_config: val.eval_config,
Expand All @@ -479,6 +491,14 @@ impl From<ThresholdAlert> for AlertConfig {
}

impl ThresholdAlert {
pub(crate) fn validate_oss_query_type(&self) -> Result<(), AlertError> {
if self.query_type == AlertQueryType::Promql {
return Err(AlertError::NotPresentInOSS("promql alerts"));
}

Ok(())
}

fn create_group_message(&self, breached_groups: &[GroupResult]) -> Result<String, AlertError> {
let header = self.get_message_header()?;
let mut message = format!("{header}\n");
Expand Down
Loading
Loading