From e822a92e917e86889ffc8e4595bc3e4a2fc04e5a Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Tue, 12 May 2026 17:32:51 -0400 Subject: [PATCH 01/12] add retry.rs --- src/config.rs | 5 + src/main.rs | 5 +- src/store.rs | 1 + src/store/retry.rs | 574 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 584 insertions(+), 1 deletion(-) create mode 100644 src/store/retry.rs diff --git a/src/config.rs b/src/config.rs index 700f1a5a..7338ea60 100644 --- a/src/config.rs +++ b/src/config.rs @@ -180,6 +180,10 @@ pub struct Config { /// The amount of time to wait before retrying writes to db when write fails. pub db_write_failure_backoff_ms: u64, + /// The maximum number of times to retry a transient database query error + /// before surfacing the error. Uses exponential backoff between retries. + pub db_query_max_retries: u32, + /// The maximum number of tasks that are buffered /// before being written to InflightTaskStore (sqlite). pub db_insert_batch_max_len: usize, @@ -378,6 +382,7 @@ impl Default for Config { pg_default_database_name: "postgres".to_owned(), pg_extra_query_params: None, db_write_failure_backoff_ms: 4000, + db_query_max_retries: 3, db_insert_batch_max_len: 256, db_insert_batch_max_size: 16_000_000, db_insert_batch_max_time_ms: 1000, diff --git a/src/main.rs b/src/main.rs index c91a766f..4bbc8ec4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,6 +37,7 @@ use taskbroker::store::adapters::postgres::{ PostgresActivationStore, PostgresActivationStoreConfig, }; use taskbroker::store::adapters::sqlite::{InflightActivationStoreConfig, SqliteActivationStore}; +use taskbroker::store::retry::RetryStore; use taskbroker::store::traits::InflightActivationStore; use taskbroker::upkeep::upkeep; use taskbroker::{Args, get_version}; @@ -68,7 +69,7 @@ async fn main() -> Result<(), Error> { logging::init(logging::LoggingConfig::from_config(&config)); metrics::init(metrics::MetricsConfig::from_config(&config)); - let store: Arc = match config.database_adapter { + let inner_store: Arc = match config.database_adapter { DatabaseAdapter::Sqlite => Arc::new( SqliteActivationStore::new( &config.db_path, @@ -81,6 +82,8 @@ async fn main() -> Result<(), Error> { .await?, ), }; + let store: Arc = + Arc::new(RetryStore::new(inner_store, config.db_query_max_retries)); // If this is an environment where the topics might not exist, check and create them. if config.create_missing_topics { diff --git a/src/store.rs b/src/store.rs index bdc6e8df..c2d65951 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,5 +1,6 @@ pub mod activation; pub mod adapters; +pub mod retry; pub mod traits; pub mod types; diff --git a/src/store/retry.rs b/src/store/retry.rs new file mode 100644 index 00000000..38e927f9 --- /dev/null +++ b/src/store/retry.rs @@ -0,0 +1,574 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Error; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use tokio::time::sleep; +use tracing::{info, warn}; + +use crate::store::activation::{InflightActivation, InflightActivationStatus}; +use crate::store::traits::InflightActivationStore; +use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; + +const BASE_BACKOFF_MS: u64 = 100; +const MAX_BACKOFF_MS: u64 = 5000; + +/// Returns true if the error message indicates a transient database error +/// that is likely to succeed on retry. +fn is_transient_error(err: &Error) -> bool { + let msg = err.to_string(); + msg.contains("server conn crashed") + || msg.contains("server shutting down") + || msg.contains("connection reset") + || msg.contains("broken pipe") + || msg.contains("connection refused") + || msg.contains("pool timed out") +} + +/// Calculates the backoff duration for a given attempt using exponential backoff. +fn backoff_duration(attempt: u32) -> Duration { + let ms = BASE_BACKOFF_MS + .saturating_mul(1 << attempt) + .min(MAX_BACKOFF_MS); + Duration::from_millis(ms) +} + +/// A wrapper around an `InflightActivationStore` that retries transient +/// database errors with exponential backoff. +pub struct RetryStore { + inner: Arc, + max_retries: u32, +} + +impl RetryStore { + pub fn new(inner: Arc, max_retries: u32) -> Self { + Self { inner, max_retries } + } +} + +/// Macro to reduce boilerplate for delegating trait methods with retry logic. +/// For each method call, if the inner store returns a transient error, +/// we retry up to `self.max_retries` times with exponential backoff. +macro_rules! retry_method { + ($self:ident, $method:ident ( $($arg:expr),* $(,)? )) => {{ + let mut attempt = 0u32; + loop { + match $self.inner.$method( $($arg),* ).await { + Ok(val) => { + if attempt > 0 { + info!( + method = stringify!($method), + attempt, + "Query succeeded after retry" + ); + metrics::counter!( + "store.retry.succeeded", + "method" => stringify!($method), + ) + .increment(1); + } + return Ok(val); + } + Err(err) if attempt < $self.max_retries && is_transient_error(&err) => { + let backoff = backoff_duration(attempt); + warn!( + method = stringify!($method), + attempt, + backoff_ms = backoff.as_millis() as u64, + error = %err, + "Transient database error, retrying" + ); + metrics::counter!( + "store.retry.attempt", + "method" => stringify!($method), + ) + .increment(1); + sleep(backoff).await; + attempt += 1; + } + Err(err) => { + if attempt > 0 { + metrics::counter!( + "store.retry.exhausted", + "method" => stringify!($method), + ) + .increment(1); + } + return Err(err); + } + } + } + }}; +} + +/// Same as `retry_method!` but clones an owned argument before each attempt, +/// since the inner method consumes it. +macro_rules! retry_method_clone { + ($self:ident, $method:ident, $owned_arg:expr $(,)?) => {{ + let mut attempt = 0u32; + loop { + let cloned = $owned_arg.clone(); + match $self.inner.$method(cloned).await { + Ok(val) => { + if attempt > 0 { + info!( + method = stringify!($method), + attempt, + "Query succeeded after retry" + ); + metrics::counter!( + "store.retry.succeeded", + "method" => stringify!($method), + ) + .increment(1); + } + return Ok(val); + } + Err(err) if attempt < $self.max_retries && is_transient_error(&err) => { + let backoff = backoff_duration(attempt); + warn!( + method = stringify!($method), + attempt, + backoff_ms = backoff.as_millis() as u64, + error = %err, + "Transient database error, retrying" + ); + metrics::counter!( + "store.retry.attempt", + "method" => stringify!($method), + ) + .increment(1); + sleep(backoff).await; + attempt += 1; + } + Err(err) => { + if attempt > 0 { + metrics::counter!( + "store.retry.exhausted", + "method" => stringify!($method), + ) + .increment(1); + } + return Err(err); + } + } + } + }}; +} + +#[async_trait] +impl InflightActivationStore for RetryStore { + async fn store(&self, batch: Vec) -> Result { + retry_method_clone!(self, store, batch) + } + + fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { + self.inner.assign_partitions(partitions) + } + + async fn claim_activations( + &self, + application: Option<&str>, + namespaces: Option<&[String]>, + limit: Option, + bucket: Option, + mark_processing: bool, + ) -> Result, Error> { + retry_method!( + self, + claim_activations(application, namespaces, limit, bucket, mark_processing) + ) + } + + async fn mark_activation_processing(&self, id: &str) -> Result<(), Error> { + retry_method!(self, mark_activation_processing(id)) + } + + async fn set_status( + &self, + id: &str, + status: InflightActivationStatus, + ) -> Result, Error> { + retry_method!(self, set_status(id, status)) + } + + async fn pending_activation_max_lag(&self, now: &DateTime) -> f64 { + self.inner.pending_activation_max_lag(now).await + } + + async fn count_by_status(&self, status: InflightActivationStatus) -> Result { + retry_method!(self, count_by_status(status)) + } + + async fn count(&self) -> Result { + retry_method!(self, count()) + } + + async fn get_by_id(&self, id: &str) -> Result, Error> { + retry_method!(self, get_by_id(id)) + } + + async fn count_depths(&self) -> Result { + retry_method!(self, count_depths()) + } + + async fn set_processing_deadline( + &self, + id: &str, + deadline: Option>, + ) -> Result<(), Error> { + retry_method!(self, set_processing_deadline(id, deadline)) + } + + async fn delete_activation(&self, id: &str) -> Result<(), Error> { + retry_method!(self, delete_activation(id)) + } + + async fn vacuum_db(&self) -> Result<(), Error> { + retry_method!(self, vacuum_db()) + } + + async fn full_vacuum_db(&self) -> Result<(), Error> { + retry_method!(self, full_vacuum_db()) + } + + async fn db_size(&self) -> Result { + retry_method!(self, db_size()) + } + + async fn get_retry_activations(&self) -> Result, Error> { + retry_method!(self, get_retry_activations()) + } + + async fn handle_claim_expiration(&self) -> Result { + retry_method!(self, handle_claim_expiration()) + } + + async fn handle_processing_deadline(&self) -> Result { + retry_method!(self, handle_processing_deadline()) + } + + async fn handle_processing_attempts(&self) -> Result { + retry_method!(self, handle_processing_attempts()) + } + + async fn handle_expires_at(&self) -> Result { + retry_method!(self, handle_expires_at()) + } + + async fn handle_delay_until(&self) -> Result { + retry_method!(self, handle_delay_until()) + } + + async fn handle_failed_tasks(&self) -> Result { + retry_method!(self, handle_failed_tasks()) + } + + async fn mark_completed(&self, ids: Vec) -> Result { + retry_method_clone!(self, mark_completed, ids) + } + + async fn remove_completed(&self) -> Result { + retry_method!(self, remove_completed()) + } + + async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { + retry_method_clone!(self, remove_killswitched, killswitched_tasks) + } + + async fn clear(&self) -> Result<(), Error> { + retry_method!(self, clear()) + } + + async fn remove_db(&self) -> Result<(), Error> { + self.inner.remove_db().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::anyhow; + use std::sync::atomic::{AtomicU32, Ordering}; + + /// A mock store that fails with a configurable error a set number of + /// times before succeeding. + struct MockFailingStore { + fail_count: AtomicU32, + transient: bool, + } + + impl MockFailingStore { + fn new(fail_count: u32, transient: bool) -> Self { + Self { + fail_count: AtomicU32::new(fail_count), + transient, + } + } + + fn make_error(&self) -> Error { + if self.transient { + anyhow!("error returned from database: server conn crashed?") + } else { + anyhow!("some non-transient error") + } + } + + fn should_fail(&self) -> bool { + let remaining = self.fail_count.load(Ordering::SeqCst); + if remaining > 0 { + self.fail_count.fetch_sub(1, Ordering::SeqCst); + true + } else { + false + } + } + } + + #[async_trait] + impl InflightActivationStore for MockFailingStore { + async fn store(&self, _batch: Vec) -> Result { + if self.should_fail() { + Err(self.make_error()) + } else { + Ok(1) + } + } + + fn assign_partitions(&self, _partitions: Vec) -> Result<(), Error> { + Ok(()) + } + + async fn claim_activations( + &self, + _application: Option<&str>, + _namespaces: Option<&[String]>, + _limit: Option, + _bucket: Option, + _mark_processing: bool, + ) -> Result, Error> { + if self.should_fail() { + Err(self.make_error()) + } else { + Ok(vec![]) + } + } + + async fn mark_activation_processing(&self, _id: &str) -> Result<(), Error> { + if self.should_fail() { + Err(self.make_error()) + } else { + Ok(()) + } + } + + async fn set_status( + &self, + _id: &str, + _status: InflightActivationStatus, + ) -> Result, Error> { + if self.should_fail() { + Err(self.make_error()) + } else { + Ok(None) + } + } + + async fn pending_activation_max_lag(&self, _now: &DateTime) -> f64 { + 0.0 + } + + async fn count_by_status( + &self, + _status: InflightActivationStatus, + ) -> Result { + Ok(0) + } + + async fn count(&self) -> Result { + Ok(0) + } + + async fn get_by_id(&self, _id: &str) -> Result, Error> { + Ok(None) + } + + async fn set_processing_deadline( + &self, + _id: &str, + _deadline: Option>, + ) -> Result<(), Error> { + Ok(()) + } + + async fn delete_activation(&self, _id: &str) -> Result<(), Error> { + Ok(()) + } + + async fn vacuum_db(&self) -> Result<(), Error> { + Ok(()) + } + + async fn full_vacuum_db(&self) -> Result<(), Error> { + Ok(()) + } + + async fn db_size(&self) -> Result { + Ok(0) + } + + async fn get_retry_activations(&self) -> Result, Error> { + Ok(vec![]) + } + + async fn handle_claim_expiration(&self) -> Result { + Ok(0) + } + + async fn handle_processing_deadline(&self) -> Result { + Ok(0) + } + + async fn handle_processing_attempts(&self) -> Result { + Ok(0) + } + + async fn handle_expires_at(&self) -> Result { + Ok(0) + } + + async fn handle_delay_until(&self) -> Result { + Ok(0) + } + + async fn handle_failed_tasks(&self) -> Result { + Ok(FailedTasksForwarder { + to_discard: vec![], + to_deadletter: vec![], + }) + } + + async fn mark_completed(&self, _ids: Vec) -> Result { + Ok(0) + } + + async fn remove_completed(&self) -> Result { + Ok(0) + } + + async fn remove_killswitched( + &self, + _killswitched_tasks: Vec, + ) -> Result { + Ok(0) + } + + async fn clear(&self) -> Result<(), Error> { + Ok(()) + } + } + + #[tokio::test] + async fn test_retry_succeeds_after_transient_errors() { + let mock = Arc::new(MockFailingStore::new(2, true)); + let store = RetryStore::new(mock, 3); + + let result = store.store(vec![]).await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 1); + } + + #[tokio::test] + async fn test_retry_exhausted_surfaces_error() { + let mock = Arc::new(MockFailingStore::new(5, true)); + let store = RetryStore::new(mock, 3); + + let result = store.store(vec![]).await; + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("server conn crashed") + ); + } + + #[tokio::test] + async fn test_non_transient_error_not_retried() { + let mock = Arc::new(MockFailingStore::new(1, false)); + let store = RetryStore::new(mock.clone(), 3); + + let result = store.store(vec![]).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("non-transient")); + // The fail count was decremented once (the initial attempt), no retries + assert_eq!(mock.fail_count.load(Ordering::SeqCst), 0); + } + + #[tokio::test] + async fn test_set_status_retries_on_transient_error() { + let mock = Arc::new(MockFailingStore::new(1, true)); + let store = RetryStore::new(mock, 3); + + let result = store + .set_status("test-id", InflightActivationStatus::Complete) + .await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_claim_activations_retries_on_transient_error() { + let mock = Arc::new(MockFailingStore::new(2, true)); + let store = RetryStore::new(mock, 3); + + let result = store + .claim_activations(None, None, Some(1), None, true) + .await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_mark_activation_processing_retries_on_transient_error() { + let mock = Arc::new(MockFailingStore::new(2, true)); + let store = RetryStore::new(mock, 3); + + let result = store.mark_activation_processing("test-id").await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_zero_retries_surfaces_immediately() { + let mock = Arc::new(MockFailingStore::new(1, true)); + let store = RetryStore::new(mock, 0); + + let result = store.store(vec![]).await; + assert!(result.is_err()); + } + + #[test] + fn test_is_transient_error() { + assert!(is_transient_error(&anyhow!( + "Unable to write to sqlite: error returned from database: server conn crashed?" + ))); + assert!(is_transient_error(&anyhow!( + "error returned from database: server shutting down" + ))); + assert!(is_transient_error(&anyhow!("connection reset"))); + assert!(is_transient_error(&anyhow!("broken pipe"))); + assert!(is_transient_error(&anyhow!("connection refused"))); + assert!(is_transient_error(&anyhow!("pool timed out"))); + assert!(!is_transient_error(&anyhow!("unique constraint violation"))); + assert!(!is_transient_error(&anyhow!("syntax error in SQL"))); + } + + #[test] + fn test_backoff_duration() { + assert_eq!(backoff_duration(0), Duration::from_millis(100)); + assert_eq!(backoff_duration(1), Duration::from_millis(200)); + assert_eq!(backoff_duration(2), Duration::from_millis(400)); + assert_eq!(backoff_duration(3), Duration::from_millis(800)); + // Should cap at MAX_BACKOFF_MS + assert_eq!(backoff_duration(10), Duration::from_millis(MAX_BACKOFF_MS)); + } +} From 66525aad6d94b1d5000ffdc875aa411ce59e732f Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Tue, 12 May 2026 22:53:42 -0400 Subject: [PATCH 02/12] finalize tests --- src/config.rs | 5 +- src/main.rs | 6 ++- src/store/retry.rs | 132 ++++++++++++++++++++++++++++----------------- 3 files changed, 90 insertions(+), 53 deletions(-) diff --git a/src/config.rs b/src/config.rs index 7338ea60..696dfebe 100644 --- a/src/config.rs +++ b/src/config.rs @@ -182,7 +182,8 @@ pub struct Config { /// The maximum number of times to retry a transient database query error /// before surfacing the error. Uses exponential backoff between retries. - pub db_query_max_retries: u32, + /// When None, queries are not retried. + pub db_query_max_retries: Option, /// The maximum number of tasks that are buffered /// before being written to InflightTaskStore (sqlite). @@ -382,7 +383,7 @@ impl Default for Config { pg_default_database_name: "postgres".to_owned(), pg_extra_query_params: None, db_write_failure_backoff_ms: 4000, - db_query_max_retries: 3, + db_query_max_retries: None, db_insert_batch_max_len: 256, db_insert_batch_max_size: 16_000_000, db_insert_batch_max_time_ms: 1000, diff --git a/src/main.rs b/src/main.rs index 4bbc8ec4..49abb52e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -82,8 +82,10 @@ async fn main() -> Result<(), Error> { .await?, ), }; - let store: Arc = - Arc::new(RetryStore::new(inner_store, config.db_query_max_retries)); + let store: Arc = match config.db_query_max_retries { + Some(max_retries) => Arc::new(RetryStore::new(inner_store, max_retries)), + None => inner_store, + }; // If this is an environment where the topics might not exist, check and create them. if config.create_missing_topics { diff --git a/src/store/retry.rs b/src/store/retry.rs index 38e927f9..cc23d4b2 100644 --- a/src/store/retry.rs +++ b/src/store/retry.rs @@ -14,16 +14,17 @@ use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; const BASE_BACKOFF_MS: u64 = 100; const MAX_BACKOFF_MS: u64 = 5000; -/// Returns true if the error message indicates a transient database error -/// that is likely to succeed on retry. -fn is_transient_error(err: &Error) -> bool { - let msg = err.to_string(); - msg.contains("server conn crashed") - || msg.contains("server shutting down") - || msg.contains("connection reset") - || msg.contains("broken pipe") - || msg.contains("connection refused") - || msg.contains("pool timed out") +/// Returns true if the error is a transient database/connection error +/// that is likely to succeed on retry. Downcasts the anyhow::Error to +/// sqlx::Error to match on structured variants rather than parsing strings. +fn is_retryable_error(err: &Error) -> bool { + match err.downcast_ref::() { + Some(sqlx::Error::Io(_)) => true, + Some(sqlx::Error::PoolTimedOut) => true, + Some(sqlx::Error::PoolClosed) => true, + Some(sqlx::Error::WorkerCrashed) => true, + _ => false, + } } /// Calculates the backoff duration for a given attempt using exponential backoff. @@ -34,8 +35,8 @@ fn backoff_duration(attempt: u32) -> Duration { Duration::from_millis(ms) } -/// A wrapper around an `InflightActivationStore` that retries transient -/// database errors with exponential backoff. +/// A wrapper around an `InflightActivationStore` that retries failed +/// database queries with exponential backoff. pub struct RetryStore { inner: Arc, max_retries: u32, @@ -48,7 +49,7 @@ impl RetryStore { } /// Macro to reduce boilerplate for delegating trait methods with retry logic. -/// For each method call, if the inner store returns a transient error, +/// For each method call, if the inner store returns a retryable error, /// we retry up to `self.max_retries` times with exponential backoff. macro_rules! retry_method { ($self:ident, $method:ident ( $($arg:expr),* $(,)? )) => {{ @@ -70,14 +71,14 @@ macro_rules! retry_method { } return Ok(val); } - Err(err) if attempt < $self.max_retries && is_transient_error(&err) => { + Err(err) if attempt < $self.max_retries && is_retryable_error(&err) => { let backoff = backoff_duration(attempt); warn!( method = stringify!($method), attempt, backoff_ms = backoff.as_millis() as u64, error = %err, - "Transient database error, retrying" + "Retryable database error, retrying" ); metrics::counter!( "store.retry.attempt", @@ -125,14 +126,14 @@ macro_rules! retry_method_clone { } return Ok(val); } - Err(err) if attempt < $self.max_retries && is_transient_error(&err) => { + Err(err) if attempt < $self.max_retries && is_retryable_error(&err) => { let backoff = backoff_duration(attempt); warn!( method = stringify!($method), attempt, backoff_ms = backoff.as_millis() as u64, error = %err, - "Transient database error, retrying" + "Retryable database error, retrying" ); metrics::counter!( "store.retry.attempt", @@ -289,29 +290,40 @@ impl InflightActivationStore for RetryStore { #[cfg(test)] mod tests { use super::*; - use anyhow::anyhow; + use rstest::rstest; use std::sync::atomic::{AtomicU32, Ordering}; - /// A mock store that fails with a configurable error a set number of - /// times before succeeding. + /// Helper to create a retryable error (sqlx transient error wrapped in anyhow). + fn retryable_error() -> Error { + Error::from(sqlx::Error::PoolTimedOut) + } + + /// Helper to create a non-retryable error (sqlx database/logic error wrapped in anyhow). + fn non_retryable_error() -> Error { + Error::from(sqlx::Error::RowNotFound) + } + + /// A mock store that fails a set number of times before succeeding. + /// The `retryable` flag controls whether errors are transient (retryable) + /// or permanent (non-retryable). struct MockFailingStore { fail_count: AtomicU32, - transient: bool, + retryable: bool, } impl MockFailingStore { - fn new(fail_count: u32, transient: bool) -> Self { + fn new(fail_count: u32, retryable: bool) -> Self { Self { fail_count: AtomicU32::new(fail_count), - transient, + retryable, } } fn make_error(&self) -> Error { - if self.transient { - anyhow!("error returned from database: server conn crashed?") + if self.retryable { + retryable_error() } else { - anyhow!("some non-transient error") + non_retryable_error() } } @@ -470,7 +482,7 @@ mod tests { } #[tokio::test] - async fn test_retry_succeeds_after_transient_errors() { + async fn test_retry_succeeds_after_retryable_errors() { let mock = Arc::new(MockFailingStore::new(2, true)); let store = RetryStore::new(mock, 3); @@ -486,28 +498,21 @@ mod tests { let result = store.store(vec![]).await; assert!(result.is_err()); - assert!( - result - .unwrap_err() - .to_string() - .contains("server conn crashed") - ); } #[tokio::test] - async fn test_non_transient_error_not_retried() { + async fn test_non_retryable_error_not_retried() { let mock = Arc::new(MockFailingStore::new(1, false)); let store = RetryStore::new(mock.clone(), 3); let result = store.store(vec![]).await; assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("non-transient")); // The fail count was decremented once (the initial attempt), no retries assert_eq!(mock.fail_count.load(Ordering::SeqCst), 0); } #[tokio::test] - async fn test_set_status_retries_on_transient_error() { + async fn test_set_status_retries_on_retryable_error() { let mock = Arc::new(MockFailingStore::new(1, true)); let store = RetryStore::new(mock, 3); @@ -518,7 +523,7 @@ mod tests { } #[tokio::test] - async fn test_claim_activations_retries_on_transient_error() { + async fn test_claim_activations_retries_on_retryable_error() { let mock = Arc::new(MockFailingStore::new(2, true)); let store = RetryStore::new(mock, 3); @@ -529,7 +534,7 @@ mod tests { } #[tokio::test] - async fn test_mark_activation_processing_retries_on_transient_error() { + async fn test_mark_activation_processing_retries_on_retryable_error() { let mock = Arc::new(MockFailingStore::new(2, true)); let store = RetryStore::new(mock, 3); @@ -546,20 +551,49 @@ mod tests { assert!(result.is_err()); } + /// Mirrors the main.rs wiring: when max_retries is None, RetryStore is not + /// used and the inner store is called directly — retryable errors surface + /// immediately. When Some, RetryStore wraps the inner store and retries. + #[rstest] + #[case::none_bypasses_retry(None, true)] + #[case::some_enables_retry(Some(3), false)] + #[tokio::test] + async fn test_config_retry_wiring( + #[case] max_retries: Option, + #[case] expect_err: bool, + ) { + // Mock fails once with a retryable error then succeeds + let inner: Arc = + Arc::new(MockFailingStore::new(1, true)); + + // Simulate main.rs wiring + let store: Arc = match max_retries { + Some(n) => Arc::new(RetryStore::new(inner, n)), + None => inner, + }; + + let result = store.store(vec![]).await; + assert_eq!(result.is_err(), expect_err); + } + #[test] - fn test_is_transient_error() { - assert!(is_transient_error(&anyhow!( - "Unable to write to sqlite: error returned from database: server conn crashed?" + fn test_is_retryable_error() { + // Retryable: transient connection/pool errors + assert!(is_retryable_error(&Error::from(sqlx::Error::PoolTimedOut))); + assert!(is_retryable_error(&Error::from(sqlx::Error::PoolClosed))); + assert!(is_retryable_error(&Error::from(sqlx::Error::WorkerCrashed))); + assert!(is_retryable_error(&Error::from(sqlx::Error::Io( + std::io::Error::new(std::io::ErrorKind::ConnectionReset, "connection reset") + )))); + + // Not retryable: logic/schema errors + assert!(!is_retryable_error(&Error::from(sqlx::Error::RowNotFound))); + assert!(!is_retryable_error(&Error::from( + sqlx::Error::ColumnNotFound("id".into()) ))); - assert!(is_transient_error(&anyhow!( - "error returned from database: server shutting down" + assert!(!is_retryable_error(&Error::from( + sqlx::Error::Protocol("unexpected".into()) ))); - assert!(is_transient_error(&anyhow!("connection reset"))); - assert!(is_transient_error(&anyhow!("broken pipe"))); - assert!(is_transient_error(&anyhow!("connection refused"))); - assert!(is_transient_error(&anyhow!("pool timed out"))); - assert!(!is_transient_error(&anyhow!("unique constraint violation"))); - assert!(!is_transient_error(&anyhow!("syntax error in SQL"))); } #[test] From dc55d5f4c2f4821cdd857f4fed56a8efead875ff Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Tue, 12 May 2026 23:14:48 -0400 Subject: [PATCH 03/12] whitespace formatting --- src/store/retry.rs | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/store/retry.rs b/src/store/retry.rs index cc23d4b2..1ed995a8 100644 --- a/src/store/retry.rs +++ b/src/store/retry.rs @@ -391,10 +391,7 @@ mod tests { 0.0 } - async fn count_by_status( - &self, - _status: InflightActivationStatus, - ) -> Result { + async fn count_by_status(&self, _status: InflightActivationStatus) -> Result { Ok(0) } @@ -558,13 +555,9 @@ mod tests { #[case::none_bypasses_retry(None, true)] #[case::some_enables_retry(Some(3), false)] #[tokio::test] - async fn test_config_retry_wiring( - #[case] max_retries: Option, - #[case] expect_err: bool, - ) { + async fn test_config_retry_wiring(#[case] max_retries: Option, #[case] expect_err: bool) { // Mock fails once with a retryable error then succeeds - let inner: Arc = - Arc::new(MockFailingStore::new(1, true)); + let inner: Arc = Arc::new(MockFailingStore::new(1, true)); // Simulate main.rs wiring let store: Arc = match max_retries { @@ -591,9 +584,9 @@ mod tests { assert!(!is_retryable_error(&Error::from( sqlx::Error::ColumnNotFound("id".into()) ))); - assert!(!is_retryable_error(&Error::from( - sqlx::Error::Protocol("unexpected".into()) - ))); + assert!(!is_retryable_error(&Error::from(sqlx::Error::Protocol( + "unexpected".into() + )))); } #[test] From 916a0a45f63b63cadbdfde559ce1c4f5c35af168 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Tue, 12 May 2026 23:22:45 -0400 Subject: [PATCH 04/12] comment --- src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 696dfebe..3f9214e0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -182,7 +182,7 @@ pub struct Config { /// The maximum number of times to retry a transient database query error /// before surfacing the error. Uses exponential backoff between retries. - /// When None, queries are not retried. + /// When None, queries are not retried. Must not exceed 63 retries although something else must be wrong if it goes that high. pub db_query_max_retries: Option, /// The maximum number of tasks that are buffered From 38c935a96669eb9cffa2c60e91433058b2656707 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Tue, 12 May 2026 23:32:44 -0400 Subject: [PATCH 05/12] clippy wants matches macro --- src/store/retry.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/store/retry.rs b/src/store/retry.rs index 1ed995a8..e9c3b5bc 100644 --- a/src/store/retry.rs +++ b/src/store/retry.rs @@ -18,13 +18,13 @@ const MAX_BACKOFF_MS: u64 = 5000; /// that is likely to succeed on retry. Downcasts the anyhow::Error to /// sqlx::Error to match on structured variants rather than parsing strings. fn is_retryable_error(err: &Error) -> bool { - match err.downcast_ref::() { - Some(sqlx::Error::Io(_)) => true, - Some(sqlx::Error::PoolTimedOut) => true, - Some(sqlx::Error::PoolClosed) => true, - Some(sqlx::Error::WorkerCrashed) => true, - _ => false, - } + matches!( + err.downcast_ref::(), + Some(sqlx::Error::Io(_)) + | Some(sqlx::Error::PoolTimedOut) + | Some(sqlx::Error::PoolClosed) + | Some(sqlx::Error::WorkerCrashed) + ) } /// Calculates the backoff duration for a given attempt using exponential backoff. From 841cdb242396b40ff93c2f2854b6a728b11de543 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 13 May 2026 12:10:45 -0400 Subject: [PATCH 06/12] remove exponential backoff --- src/config.rs | 7 ++++-- src/main.rs | 6 ++++- src/store/retry.rs | 62 +++++++++++++++++----------------------------- 3 files changed, 33 insertions(+), 42 deletions(-) diff --git a/src/config.rs b/src/config.rs index 3f9214e0..3dd5d290 100644 --- a/src/config.rs +++ b/src/config.rs @@ -181,10 +181,12 @@ pub struct Config { pub db_write_failure_backoff_ms: u64, /// The maximum number of times to retry a transient database query error - /// before surfacing the error. Uses exponential backoff between retries. - /// When None, queries are not retried. Must not exceed 63 retries although something else must be wrong if it goes that high. + /// before surfacing the error. When None, queries are not retried. pub db_query_max_retries: Option, + /// The delay in milliseconds between query retry attempts. + pub db_query_retry_delay_ms: u64, + /// The maximum number of tasks that are buffered /// before being written to InflightTaskStore (sqlite). pub db_insert_batch_max_len: usize, @@ -384,6 +386,7 @@ impl Default for Config { pg_extra_query_params: None, db_write_failure_backoff_ms: 4000, db_query_max_retries: None, + db_query_retry_delay_ms: 100, db_insert_batch_max_len: 256, db_insert_batch_max_size: 16_000_000, db_insert_batch_max_time_ms: 1000, diff --git a/src/main.rs b/src/main.rs index 49abb52e..48f9c038 100644 --- a/src/main.rs +++ b/src/main.rs @@ -83,7 +83,11 @@ async fn main() -> Result<(), Error> { ), }; let store: Arc = match config.db_query_max_retries { - Some(max_retries) => Arc::new(RetryStore::new(inner_store, max_retries)), + Some(max_retries) => Arc::new(RetryStore::new( + inner_store, + max_retries, + config.db_query_retry_delay_ms, + )), None => inner_store, }; diff --git a/src/store/retry.rs b/src/store/retry.rs index e9c3b5bc..330b68d7 100644 --- a/src/store/retry.rs +++ b/src/store/retry.rs @@ -11,9 +11,6 @@ use crate::store::activation::{InflightActivation, InflightActivationStatus}; use crate::store::traits::InflightActivationStore; use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; -const BASE_BACKOFF_MS: u64 = 100; -const MAX_BACKOFF_MS: u64 = 5000; - /// Returns true if the error is a transient database/connection error /// that is likely to succeed on retry. Downcasts the anyhow::Error to /// sqlx::Error to match on structured variants rather than parsing strings. @@ -27,30 +24,31 @@ fn is_retryable_error(err: &Error) -> bool { ) } -/// Calculates the backoff duration for a given attempt using exponential backoff. -fn backoff_duration(attempt: u32) -> Duration { - let ms = BASE_BACKOFF_MS - .saturating_mul(1 << attempt) - .min(MAX_BACKOFF_MS); - Duration::from_millis(ms) -} - /// A wrapper around an `InflightActivationStore` that retries failed -/// database queries with exponential backoff. +/// database queries with a fixed delay between attempts. pub struct RetryStore { inner: Arc, max_retries: u32, + retry_delay: Duration, } impl RetryStore { - pub fn new(inner: Arc, max_retries: u32) -> Self { - Self { inner, max_retries } + pub fn new( + inner: Arc, + max_retries: u32, + retry_delay_ms: u64, + ) -> Self { + Self { + inner, + max_retries, + retry_delay: Duration::from_millis(retry_delay_ms), + } } } /// Macro to reduce boilerplate for delegating trait methods with retry logic. /// For each method call, if the inner store returns a retryable error, -/// we retry up to `self.max_retries` times with exponential backoff. +/// we retry up to `self.max_retries` times with a fixed delay. macro_rules! retry_method { ($self:ident, $method:ident ( $($arg:expr),* $(,)? )) => {{ let mut attempt = 0u32; @@ -72,11 +70,9 @@ macro_rules! retry_method { return Ok(val); } Err(err) if attempt < $self.max_retries && is_retryable_error(&err) => { - let backoff = backoff_duration(attempt); warn!( method = stringify!($method), attempt, - backoff_ms = backoff.as_millis() as u64, error = %err, "Retryable database error, retrying" ); @@ -85,7 +81,7 @@ macro_rules! retry_method { "method" => stringify!($method), ) .increment(1); - sleep(backoff).await; + sleep($self.retry_delay).await; attempt += 1; } Err(err) => { @@ -127,11 +123,9 @@ macro_rules! retry_method_clone { return Ok(val); } Err(err) if attempt < $self.max_retries && is_retryable_error(&err) => { - let backoff = backoff_duration(attempt); warn!( method = stringify!($method), attempt, - backoff_ms = backoff.as_millis() as u64, error = %err, "Retryable database error, retrying" ); @@ -140,7 +134,7 @@ macro_rules! retry_method_clone { "method" => stringify!($method), ) .increment(1); - sleep(backoff).await; + sleep($self.retry_delay).await; attempt += 1; } Err(err) => { @@ -481,7 +475,7 @@ mod tests { #[tokio::test] async fn test_retry_succeeds_after_retryable_errors() { let mock = Arc::new(MockFailingStore::new(2, true)); - let store = RetryStore::new(mock, 3); + let store = RetryStore::new(mock, 3, 0); let result = store.store(vec![]).await; assert!(result.is_ok()); @@ -491,7 +485,7 @@ mod tests { #[tokio::test] async fn test_retry_exhausted_surfaces_error() { let mock = Arc::new(MockFailingStore::new(5, true)); - let store = RetryStore::new(mock, 3); + let store = RetryStore::new(mock, 3, 0); let result = store.store(vec![]).await; assert!(result.is_err()); @@ -500,7 +494,7 @@ mod tests { #[tokio::test] async fn test_non_retryable_error_not_retried() { let mock = Arc::new(MockFailingStore::new(1, false)); - let store = RetryStore::new(mock.clone(), 3); + let store = RetryStore::new(mock.clone(), 3, 0); let result = store.store(vec![]).await; assert!(result.is_err()); @@ -511,7 +505,7 @@ mod tests { #[tokio::test] async fn test_set_status_retries_on_retryable_error() { let mock = Arc::new(MockFailingStore::new(1, true)); - let store = RetryStore::new(mock, 3); + let store = RetryStore::new(mock, 3, 0); let result = store .set_status("test-id", InflightActivationStatus::Complete) @@ -522,7 +516,7 @@ mod tests { #[tokio::test] async fn test_claim_activations_retries_on_retryable_error() { let mock = Arc::new(MockFailingStore::new(2, true)); - let store = RetryStore::new(mock, 3); + let store = RetryStore::new(mock, 3, 0); let result = store .claim_activations(None, None, Some(1), None, true) @@ -533,7 +527,7 @@ mod tests { #[tokio::test] async fn test_mark_activation_processing_retries_on_retryable_error() { let mock = Arc::new(MockFailingStore::new(2, true)); - let store = RetryStore::new(mock, 3); + let store = RetryStore::new(mock, 3, 0); let result = store.mark_activation_processing("test-id").await; assert!(result.is_ok()); @@ -542,7 +536,7 @@ mod tests { #[tokio::test] async fn test_zero_retries_surfaces_immediately() { let mock = Arc::new(MockFailingStore::new(1, true)); - let store = RetryStore::new(mock, 0); + let store = RetryStore::new(mock, 0, 0); let result = store.store(vec![]).await; assert!(result.is_err()); @@ -561,7 +555,7 @@ mod tests { // Simulate main.rs wiring let store: Arc = match max_retries { - Some(n) => Arc::new(RetryStore::new(inner, n)), + Some(n) => Arc::new(RetryStore::new(inner, n, 0)), None => inner, }; @@ -588,14 +582,4 @@ mod tests { "unexpected".into() )))); } - - #[test] - fn test_backoff_duration() { - assert_eq!(backoff_duration(0), Duration::from_millis(100)); - assert_eq!(backoff_duration(1), Duration::from_millis(200)); - assert_eq!(backoff_duration(2), Duration::from_millis(400)); - assert_eq!(backoff_duration(3), Duration::from_millis(800)); - // Should cap at MAX_BACKOFF_MS - assert_eq!(backoff_duration(10), Duration::from_millis(MAX_BACKOFF_MS)); - } } From b2a5a58ed2301c12cf3a5628cab65a801edebc91 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 13 May 2026 14:59:39 -0400 Subject: [PATCH 07/12] remove redundant code, only clone arg --- src/store/retry.rs | 59 +++------------------------------------------- 1 file changed, 3 insertions(+), 56 deletions(-) diff --git a/src/store/retry.rs b/src/store/retry.rs index 330b68d7..12c15a41 100644 --- a/src/store/retry.rs +++ b/src/store/retry.rs @@ -99,63 +99,10 @@ macro_rules! retry_method { }}; } -/// Same as `retry_method!` but clones an owned argument before each attempt, -/// since the inner method consumes it. -macro_rules! retry_method_clone { - ($self:ident, $method:ident, $owned_arg:expr $(,)?) => {{ - let mut attempt = 0u32; - loop { - let cloned = $owned_arg.clone(); - match $self.inner.$method(cloned).await { - Ok(val) => { - if attempt > 0 { - info!( - method = stringify!($method), - attempt, - "Query succeeded after retry" - ); - metrics::counter!( - "store.retry.succeeded", - "method" => stringify!($method), - ) - .increment(1); - } - return Ok(val); - } - Err(err) if attempt < $self.max_retries && is_retryable_error(&err) => { - warn!( - method = stringify!($method), - attempt, - error = %err, - "Retryable database error, retrying" - ); - metrics::counter!( - "store.retry.attempt", - "method" => stringify!($method), - ) - .increment(1); - sleep($self.retry_delay).await; - attempt += 1; - } - Err(err) => { - if attempt > 0 { - metrics::counter!( - "store.retry.exhausted", - "method" => stringify!($method), - ) - .increment(1); - } - return Err(err); - } - } - } - }}; -} - #[async_trait] impl InflightActivationStore for RetryStore { async fn store(&self, batch: Vec) -> Result { - retry_method_clone!(self, store, batch) + retry_method!(self, store(batch.clone())) } fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { @@ -261,7 +208,7 @@ impl InflightActivationStore for RetryStore { } async fn mark_completed(&self, ids: Vec) -> Result { - retry_method_clone!(self, mark_completed, ids) + retry_method!(self, mark_completed(ids.clone())) } async fn remove_completed(&self) -> Result { @@ -269,7 +216,7 @@ impl InflightActivationStore for RetryStore { } async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { - retry_method_clone!(self, remove_killswitched, killswitched_tasks) + retry_method!(self, remove_killswitched(killswitched_tasks.clone())) } async fn clear(&self) -> Result<(), Error> { From 73459f3848886dd1aa4964cb2d1d81d0cef534c7 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 15 May 2026 17:55:38 -0400 Subject: [PATCH 08/12] refactor first try --- benches/store_bench.rs | 8 + src/main.rs | 11 +- src/store/adapters/postgres.rs | 1100 ++++++++++++++++++-------------- src/store/adapters/sqlite.rs | 997 ++++++++++++++++------------- src/store/retry.rs | 561 ++++------------ src/store/tests.rs | 4 + 6 files changed, 1288 insertions(+), 1393 deletions(-) diff --git a/benches/store_bench.rs b/benches/store_bench.rs index 3c72d7e8..69fa7a7b 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -32,6 +32,10 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) { processing_deadline_grace_sec: 3, claim_lease_ms: 5000, enable_sqlite_status_metrics: false, + retry_config: taskbroker::store::retry::RetryConfig { + max_retries: 0, + retry_delay: std::time::Duration::from_millis(0), + }, }, ) .await @@ -97,6 +101,10 @@ async fn set_status(num_activations: u32, num_workers: u32) { processing_deadline_grace_sec: 3, claim_lease_ms: 5000, enable_sqlite_status_metrics: false, + retry_config: taskbroker::store::retry::RetryConfig { + max_retries: 0, + retry_delay: std::time::Duration::from_millis(0), + }, }, ) .await diff --git a/src/main.rs b/src/main.rs index 48f9c038..c91a766f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,7 +37,6 @@ use taskbroker::store::adapters::postgres::{ PostgresActivationStore, PostgresActivationStoreConfig, }; use taskbroker::store::adapters::sqlite::{InflightActivationStoreConfig, SqliteActivationStore}; -use taskbroker::store::retry::RetryStore; use taskbroker::store::traits::InflightActivationStore; use taskbroker::upkeep::upkeep; use taskbroker::{Args, get_version}; @@ -69,7 +68,7 @@ async fn main() -> Result<(), Error> { logging::init(logging::LoggingConfig::from_config(&config)); metrics::init(metrics::MetricsConfig::from_config(&config)); - let inner_store: Arc = match config.database_adapter { + let store: Arc = match config.database_adapter { DatabaseAdapter::Sqlite => Arc::new( SqliteActivationStore::new( &config.db_path, @@ -82,14 +81,6 @@ async fn main() -> Result<(), Error> { .await?, ), }; - let store: Arc = match config.db_query_max_retries { - Some(max_retries) => Arc::new(RetryStore::new( - inner_store, - max_retries, - config.db_query_retry_delay_ms, - )), - None => inner_store, - }; // If this is an environment where the topics might not exist, check and create them. if config.create_missing_topics { diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 8f087b59..4af0256a 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -16,10 +16,11 @@ use tracing::{instrument, warn}; use crate::config::Config; use crate::store::activation::{InflightActivation, InflightActivationStatus}; +use crate::store::retry::{RetryConfig, retry_query}; use crate::store::traits::InflightActivationStore; use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; -#[derive(Debug, FromRow)] +#[derive(Debug, Clone, FromRow)] struct TableRow { pub id: String, pub activation: Vec, @@ -139,6 +140,7 @@ pub struct PostgresActivationStoreConfig { pub claim_lease_ms: u64, pub vacuum_page_count: Option, pub enable_sqlite_status_metrics: bool, + pub retry_config: RetryConfig, } impl PostgresActivationStoreConfig { @@ -164,6 +166,7 @@ impl PostgresActivationStoreConfig { processing_deadline_grace_sec: config.processing_deadline_grace_sec, claim_lease_ms: config.fetch_batch_size.max(1) as u64 * config.push_queue_timeout_ms, enable_sqlite_status_metrics: config.enable_sqlite_status_metrics, + retry_config: RetryConfig::from_config(config), } } } @@ -262,67 +265,79 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn vacuum_db(&self) -> Result<(), Error> { - // TODO: Remove - Ok(()) + retry_query(&self.config.retry_config, "vacuum_db", || async { + // TODO: Remove + Ok(()) + }) + .await } /// Perform a full vacuum on the database. #[framed] async fn full_vacuum_db(&self) -> Result<(), Error> { - // TODO: Remove - Ok(()) + retry_query(&self.config.retry_config, "full_vacuum_db", || async { + // TODO: Remove + Ok(()) + }) + .await } /// Get the size of the database in bytes based on SQLite metadata queries. #[framed] async fn db_size(&self) -> Result { - let row_result: (i64,) = sqlx::query_as("SELECT pg_database_size($1) as size") - .bind(&self.config.pg_database_name) - .fetch_one(&self.read_pool) - .await?; - if row_result.0 < 0 { - return Ok(0); - } - Ok(row_result.0 as u64) + retry_query(&self.config.retry_config, "db_size", || async { + let row_result: (i64,) = sqlx::query_as("SELECT pg_database_size($1) as size") + .bind(&self.config.pg_database_name) + .fetch_one(&self.read_pool) + .await?; + if row_result.0 < 0 { + return Ok(0); + } + Ok(row_result.0 as u64) + }) + .await } /// Get an activation by id. Primarily used for testing #[framed] async fn get_by_id(&self, id: &str) -> Result, Error> { - let row_result: Option = sqlx::query_as( - " - SELECT id, - activation, - partition, - kafka_offset AS offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - claim_expires_at, - status, - at_most_once, - application, - namespace, - taskname, - on_attempts_exceeded, - bucket - FROM inflight_taskactivations - WHERE id = $1 - ", - ) - .bind(id) - .fetch_optional(&self.read_pool) - .await?; + retry_query(&self.config.retry_config, "get_by_id", || async { + let row_result: Option = sqlx::query_as( + " + SELECT id, + activation, + partition, + kafka_offset AS offset, + added_at, + received_at, + processing_attempts, + expires_at, + delay_until, + processing_deadline_duration, + processing_deadline, + claim_expires_at, + status, + at_most_once, + application, + namespace, + taskname, + on_attempts_exceeded, + bucket + FROM inflight_taskactivations + WHERE id = $1 + ", + ) + .bind(id) + .fetch_optional(&self.read_pool) + .await?; - let Some(row) = row_result else { - return Ok(None); - }; + let Some(row) = row_result else { + return Ok(None); + }; - Ok(Some(row.into())) + Ok(Some(row.into())) + }) + .await } fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { @@ -339,74 +354,78 @@ impl InflightActivationStore for PostgresActivationStore { return Ok(0); } - let mut query_builder = QueryBuilder::::new( - " - INSERT INTO inflight_taskactivations - ( - id, - activation, - partition, - kafka_offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - claim_expires_at, - status, - at_most_once, - application, - namespace, - taskname, - on_attempts_exceeded, - bucket - ) - ", - ); let rows = batch .into_iter() .map(TableRow::try_from) .collect::, _>>()?; - let query = query_builder - .push_values(rows, |mut b, row| { - b.push_bind(row.id); - b.push_bind(row.activation); - b.push_bind(row.partition); - b.push_bind(row.offset); - b.push_bind(row.added_at); - b.push_bind(row.received_at); - b.push_bind(row.processing_attempts); - b.push_bind(row.expires_at); - b.push_bind(row.delay_until); - b.push_bind(row.processing_deadline_duration); - if let Some(deadline) = row.processing_deadline { - b.push_bind(deadline); - } else { - // Add a literal null - b.push("null"); - } - if let Some(exp) = row.claim_expires_at { - b.push_bind(exp); - } else { - b.push("null"); - } - b.push_bind(row.status); - b.push_bind(row.at_most_once); - b.push_bind(row.application); - b.push_bind(row.namespace); - b.push_bind(row.taskname); - b.push_bind(row.on_attempts_exceeded as i32); - b.push_bind(row.bucket); - }) - .push(" ON CONFLICT(id) DO NOTHING") - .build(); - - let mut conn = self.acquire_write_conn_metric("store").await?; - let result = query.execute(&mut *conn).await?; - Ok(result.rows_affected()) + retry_query(&self.config.retry_config, "store", || async { + let mut query_builder = QueryBuilder::::new( + " + INSERT INTO inflight_taskactivations + ( + id, + activation, + partition, + kafka_offset, + added_at, + received_at, + processing_attempts, + expires_at, + delay_until, + processing_deadline_duration, + processing_deadline, + claim_expires_at, + status, + at_most_once, + application, + namespace, + taskname, + on_attempts_exceeded, + bucket + ) + ", + ); + let query = query_builder + .push_values(rows.clone(), |mut b, row: TableRow| { + b.push_bind(row.id); + b.push_bind(row.activation); + b.push_bind(row.partition); + b.push_bind(row.offset); + b.push_bind(row.added_at); + b.push_bind(row.received_at); + b.push_bind(row.processing_attempts); + b.push_bind(row.expires_at); + b.push_bind(row.delay_until); + b.push_bind(row.processing_deadline_duration); + if let Some(deadline) = row.processing_deadline { + b.push_bind(deadline); + } else { + // Add a literal null + b.push("null"); + } + if let Some(exp) = row.claim_expires_at { + b.push_bind(exp); + } else { + b.push("null"); + } + b.push_bind(row.status); + b.push_bind(row.at_most_once); + b.push_bind(row.application); + b.push_bind(row.namespace); + b.push_bind(row.taskname); + b.push_bind(row.on_attempts_exceeded as i32); + b.push_bind(row.bucket); + }) + .push(" ON CONFLICT(id) DO NOTHING") + .build(); + + let mut conn = self.acquire_write_conn_metric("store").await?; + let result = query.execute(&mut *conn).await?; + + Ok(result.rows_affected()) + }) + .await } #[instrument(skip_all)] @@ -419,122 +438,134 @@ impl InflightActivationStore for PostgresActivationStore { bucket: Option, mark_processing: bool, ) -> Result, Error> { - let now = Utc::now(); - let grace_period = self.config.processing_deadline_grace_sec; let claim_lease_ms = self.config.claim_lease_ms as i64; - let mut query_builder = QueryBuilder::::new( - "WITH selected_activations AS ( - SELECT id - FROM inflight_taskactivations - WHERE status = ", - ); - query_builder.push_bind(InflightActivationStatus::Pending.to_string()); - query_builder.push(" AND (expires_at IS NULL OR expires_at > "); - query_builder.push_bind(now); - query_builder.push(")"); - - self.add_partition_condition(&mut query_builder, false); + retry_query(&self.config.retry_config, "claim_activations", || async { + let now = Utc::now(); - // Handle application & namespace filtering - if let Some(value) = application { - query_builder.push(" AND application ="); - query_builder.push_bind(value); - } - if let Some(namespaces) = namespaces - && !namespaces.is_empty() - { - query_builder.push(" AND namespace IN ("); - let mut separated = query_builder.separated(", "); - for namespace in namespaces.iter() { - separated.push_bind(namespace); - } + let mut query_builder = QueryBuilder::::new( + "WITH selected_activations AS ( + SELECT id + FROM inflight_taskactivations + WHERE status = ", + ); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push(" AND (expires_at IS NULL OR expires_at > "); + query_builder.push_bind(now); query_builder.push(")"); - } - if let Some((min, max)) = bucket { - query_builder.push(" AND bucket >= "); - query_builder.push_bind(min); + self.add_partition_condition(&mut query_builder, false); - query_builder.push(" AND bucket <= "); - query_builder.push_bind(max); - } - - query_builder.push(" ORDER BY added_at"); - if let Some(limit) = limit { - query_builder.push(" LIMIT "); - query_builder.push_bind(limit); - } - query_builder.push(" FOR UPDATE SKIP LOCKED)"); + // Handle application & namespace filtering + if let Some(value) = application { + query_builder.push(" AND application ="); + query_builder.push_bind(value); + } + if let Some(namespaces) = namespaces + && !namespaces.is_empty() + { + query_builder.push(" AND namespace IN ("); + let mut separated = query_builder.separated(", "); + for namespace in namespaces.iter() { + separated.push_bind(namespace); + } + query_builder.push(")"); + } - if mark_processing { - query_builder.push(format!( - "UPDATE inflight_taskactivations - SET processing_deadline = now() + (processing_deadline_duration * interval '1 second') + (interval '{grace_period} seconds'), - claim_expires_at = NULL, - status = " - )); + if let Some((min, max)) = bucket { + query_builder.push(" AND bucket >= "); + query_builder.push_bind(min); - query_builder.push_bind(InflightActivationStatus::Processing.to_string()); - } else { - query_builder.push(format!( - "UPDATE inflight_taskactivations - SET claim_expires_at = now() + ({claim_lease_ms} * interval '1 millisecond') + (interval '{grace_period} seconds'), - processing_deadline = NULL, - status = " - )); + query_builder.push(" AND bucket <= "); + query_builder.push_bind(max); + } - query_builder.push_bind(InflightActivationStatus::Claimed.to_string()); - } + query_builder.push(" ORDER BY added_at"); + if let Some(limit) = limit { + query_builder.push(" LIMIT "); + query_builder.push_bind(limit); + } + query_builder.push(" FOR UPDATE SKIP LOCKED)"); + + if mark_processing { + query_builder.push(format!( + "UPDATE inflight_taskactivations + SET processing_deadline = now() + (processing_deadline_duration * interval '1 second') + (interval '{grace_period} seconds'), + claim_expires_at = NULL, + status = " + )); + + query_builder.push_bind(InflightActivationStatus::Processing.to_string()); + } else { + query_builder.push(format!( + "UPDATE inflight_taskactivations + SET claim_expires_at = now() + ({claim_lease_ms} * interval '1 millisecond') + (interval '{grace_period} seconds'), + processing_deadline = NULL, + status = " + )); + + query_builder.push_bind(InflightActivationStatus::Claimed.to_string()); + } - query_builder.push(" FROM selected_activations "); - query_builder.push(" WHERE inflight_taskactivations.id = selected_activations.id"); - query_builder.push(" RETURNING *, kafka_offset AS offset"); + query_builder.push(" FROM selected_activations "); + query_builder.push(" WHERE inflight_taskactivations.id = selected_activations.id"); + query_builder.push(" RETURNING *, kafka_offset AS offset"); - let mut conn = self.acquire_write_conn_metric("claim_activations").await?; - let rows: Vec = query_builder - .build_query_as::() - .fetch_all(&mut *conn) - .await?; + let mut conn = self.acquire_write_conn_metric("claim_activations").await?; + let rows: Vec = query_builder + .build_query_as::() + .fetch_all(&mut *conn) + .await?; - Ok(rows.into_iter().map(|row| row.into()).collect()) + Ok(rows.into_iter().map(|row| row.into()).collect()) + }) + .await } #[instrument(skip_all)] #[framed] async fn mark_activation_processing(&self, id: &str) -> Result<(), Error> { - let mut conn = self - .acquire_write_conn_metric("mark_activation_processing") - .await?; - let grace_period = self.config.processing_deadline_grace_sec; - let result = sqlx::query(&format!( - "UPDATE inflight_taskactivations SET - status = $1, - processing_deadline = now() + (processing_deadline_duration * interval '1 second') + (interval '{grace_period} seconds'), - claim_expires_at = NULL - WHERE id = $2 AND status = $3", - )) - .bind(InflightActivationStatus::Processing.to_string()) - .bind(id) - .bind(InflightActivationStatus::Claimed.to_string()) - .execute(&mut *conn) - .await?; - if result.rows_affected() == 0 { - metrics::counter!("push.mark_activation_processing", "result" => "not_found") - .increment(1); + retry_query( + &self.config.retry_config, + "mark_activation_processing", + || async { + let mut conn = self + .acquire_write_conn_metric("mark_activation_processing") + .await?; - warn!( - task_id = %id, - "Activation could not be marked as processing, it may be missing or its status may have already changed" - ); - } else { - metrics::counter!("push.mark_activation_processing", "result" => "ok").increment(1); - } + let result = sqlx::query(&format!( + "UPDATE inflight_taskactivations SET + status = $1, + processing_deadline = now() + (processing_deadline_duration * interval '1 second') + (interval '{grace_period} seconds'), + claim_expires_at = NULL + WHERE id = $2 AND status = $3", + )) + .bind(InflightActivationStatus::Processing.to_string()) + .bind(id) + .bind(InflightActivationStatus::Claimed.to_string()) + .execute(&mut *conn) + .await?; - Ok(()) + if result.rows_affected() == 0 { + metrics::counter!("push.mark_activation_processing", "result" => "not_found") + .increment(1); + + warn!( + task_id = %id, + "Activation could not be marked as processing, it may be missing or its status may have already changed" + ); + } else { + metrics::counter!("push.mark_activation_processing", "result" => "ok") + .increment(1); + } + + Ok(()) + }, + ) + .await } /// Get the age of the oldest pending activation in seconds. @@ -579,55 +610,64 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn count_by_status(&self, status: InflightActivationStatus) -> Result { - let mut query_builder = QueryBuilder::new( - "SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = ", - ); - query_builder.push_bind(status.to_string()); - self.add_partition_condition(&mut query_builder, false); - let result = query_builder - .build_query_as::<(i64,)>() - .fetch_one(&self.read_pool) - .await?; - Ok(result.0 as usize) + retry_query(&self.config.retry_config, "count_by_status", || async { + let mut query_builder = QueryBuilder::new( + "SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = ", + ); + query_builder.push_bind(status.to_string()); + self.add_partition_condition(&mut query_builder, false); + let result = query_builder + .build_query_as::<(i64,)>() + .fetch_one(&self.read_pool) + .await?; + Ok(result.0 as usize) + }) + .await } #[framed] async fn count(&self) -> Result { - let mut query_builder = - QueryBuilder::new("SELECT COUNT(*) as count FROM inflight_taskactivations"); - self.add_partition_condition(&mut query_builder, true); - let result = query_builder - .build_query_as::<(i64,)>() - .fetch_one(&self.read_pool) - .await?; - Ok(result.0 as usize) + retry_query(&self.config.retry_config, "count", || async { + let mut query_builder = + QueryBuilder::new("SELECT COUNT(*) as count FROM inflight_taskactivations"); + self.add_partition_condition(&mut query_builder, true); + let result = query_builder + .build_query_as::<(i64,)>() + .fetch_one(&self.read_pool) + .await?; + Ok(result.0 as usize) + }) + .await } #[instrument(skip_all)] #[framed] async fn count_depths(&self) -> Result { - // Notice that statuses are embedded into the query for simplicity - if the enum is every changed, this must change too! - let mut query_builder = QueryBuilder::new( - "SELECT COUNT(*) FILTER (WHERE status = 'Pending'), - COUNT(*) FILTER (WHERE status = 'Delay'), - COUNT(*) FILTER (WHERE status = 'Claimed'), - COUNT(*) FILTER (WHERE status = 'Processing') - FROM inflight_taskactivations", - ); + retry_query(&self.config.retry_config, "count_depths", || async { + // Notice that statuses are embedded into the query for simplicity - if the enum is every changed, this must change too! + let mut query_builder = QueryBuilder::new( + "SELECT COUNT(*) FILTER (WHERE status = 'Pending'), + COUNT(*) FILTER (WHERE status = 'Delay'), + COUNT(*) FILTER (WHERE status = 'Claimed'), + COUNT(*) FILTER (WHERE status = 'Processing') + FROM inflight_taskactivations", + ); - self.add_partition_condition(&mut query_builder, true); + self.add_partition_condition(&mut query_builder, true); - let row: (i64, i64, i64, i64) = query_builder - .build_query_as() - .fetch_one(&self.read_pool) - .await?; + let row: (i64, i64, i64, i64) = query_builder + .build_query_as() + .fetch_one(&self.read_pool) + .await?; - Ok(DepthCounts { - pending: row.0 as usize, - delay: row.1 as usize, - claimed: row.2 as usize, - processing: row.3 as usize, + Ok(DepthCounts { + pending: row.0 as usize, + delay: row.1 as usize, + claimed: row.2 as usize, + processing: row.3 as usize, + }) }) + .await } /// Update the status of a specific activation @@ -638,20 +678,23 @@ impl InflightActivationStore for PostgresActivationStore { id: &str, status: InflightActivationStatus, ) -> Result, Error> { - let mut conn = self.acquire_write_conn_metric("set_status").await?; - let result: Option = sqlx::query_as( - "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *, kafka_offset AS offset", - ) - .bind(status.to_string()) - .bind(id) - .fetch_optional(&mut *conn) - .await?; + retry_query(&self.config.retry_config, "set_status", || async { + let mut conn = self.acquire_write_conn_metric("set_status").await?; + let result: Option = sqlx::query_as( + "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *, kafka_offset AS offset", + ) + .bind(status.to_string()) + .bind(id) + .fetch_optional(&mut *conn) + .await?; - let Some(row) = result else { - return Ok(None); - }; + let Some(row) = result else { + return Ok(None); + }; - Ok(Some(row.into())) + Ok(Some(row.into())) + }) + .await } #[instrument(skip_all)] @@ -661,104 +704,132 @@ impl InflightActivationStore for PostgresActivationStore { id: &str, deadline: Option>, ) -> Result<(), Error> { - let mut conn = self - .acquire_write_conn_metric("set_processing_deadline") - .await?; - sqlx::query("UPDATE inflight_taskactivations SET processing_deadline = $1 WHERE id = $2") - .bind(deadline.unwrap()) - .bind(id) - .execute(&mut *conn) - .await?; - Ok(()) + retry_query( + &self.config.retry_config, + "set_processing_deadline", + || async { + let mut conn = self + .acquire_write_conn_metric("set_processing_deadline") + .await?; + sqlx::query( + "UPDATE inflight_taskactivations SET processing_deadline = $1 WHERE id = $2", + ) + .bind(deadline.unwrap()) + .bind(id) + .execute(&mut *conn) + .await?; + Ok(()) + }, + ) + .await } #[instrument(skip_all)] #[framed] async fn delete_activation(&self, id: &str) -> Result<(), Error> { - let mut conn = self.acquire_write_conn_metric("delete_activation").await?; - sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1") - .bind(id) - .execute(&mut *conn) - .await?; - Ok(()) + retry_query(&self.config.retry_config, "delete_activation", || async { + let mut conn = self.acquire_write_conn_metric("delete_activation").await?; + sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1") + .bind(id) + .execute(&mut *conn) + .await?; + Ok(()) + }) + .await } #[instrument(skip_all)] #[framed] async fn get_retry_activations(&self) -> Result, Error> { - let mut query_builder = QueryBuilder::new( - "SELECT id, - activation, - partition, - kafka_offset AS offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - claim_expires_at, - status, - at_most_once, - application, - namespace, - taskname, - on_attempts_exceeded, - bucket - FROM inflight_taskactivations - WHERE status = ", - ); - query_builder.push_bind(InflightActivationStatus::Retry.to_string()); - self.add_partition_condition(&mut query_builder, false); - - Ok(query_builder - .build_query_as::() - .fetch_all(&self.read_pool) - .await? - .into_iter() - .map(|row| row.into()) - .collect()) + retry_query( + &self.config.retry_config, + "get_retry_activations", + || async { + let mut query_builder = QueryBuilder::new( + "SELECT id, + activation, + partition, + kafka_offset AS offset, + added_at, + received_at, + processing_attempts, + expires_at, + delay_until, + processing_deadline_duration, + processing_deadline, + claim_expires_at, + status, + at_most_once, + application, + namespace, + taskname, + on_attempts_exceeded, + bucket + FROM inflight_taskactivations + WHERE status = ", + ); + query_builder.push_bind(InflightActivationStatus::Retry.to_string()); + self.add_partition_condition(&mut query_builder, false); + + Ok(query_builder + .build_query_as::() + .fetch_all(&self.read_pool) + .await? + .into_iter() + .map(|row: TableRow| row.into()) + .collect()) + }, + ) + .await } // Used in tests #[framed] async fn clear(&self) -> Result<(), Error> { - let mut conn = self.acquire_write_conn_metric("clear").await?; - sqlx::query("TRUNCATE TABLE inflight_taskactivations") - .execute(&mut *conn) - .await?; - - Ok(()) + retry_query(&self.config.retry_config, "clear", || async { + let mut conn = self.acquire_write_conn_metric("clear").await?; + sqlx::query("TRUNCATE TABLE inflight_taskactivations") + .execute(&mut *conn) + .await?; + Ok(()) + }) + .await } /// Revert expired push claims back to pending status. #[instrument(skip_all)] #[framed] async fn handle_claim_expiration(&self) -> Result { - let now = Utc::now(); - let mut conn = self - .acquire_write_conn_metric("handle_claim_expiration") - .await?; - - let mut query_builder = QueryBuilder::new( - "UPDATE inflight_taskactivations - SET claim_expires_at = null, - status = ", - ); - query_builder.push_bind(InflightActivationStatus::Pending.to_string()); - query_builder.push( - " WHERE claim_expires_at IS NOT NULL - AND claim_expires_at < ", - ); - query_builder.push_bind(now); - query_builder.push(" AND status = "); - query_builder.push_bind(InflightActivationStatus::Claimed.to_string()); - self.add_partition_condition(&mut query_builder, false); - - let released = query_builder.build().execute(&mut *conn).await?; + retry_query( + &self.config.retry_config, + "handle_claim_expiration", + || async { + let now = Utc::now(); + let mut conn = self + .acquire_write_conn_metric("handle_claim_expiration") + .await?; - Ok(released.rows_affected()) + let mut query_builder = QueryBuilder::new( + "UPDATE inflight_taskactivations + SET claim_expires_at = null, + status = ", + ); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push( + " WHERE claim_expires_at IS NOT NULL + AND claim_expires_at < ", + ); + query_builder.push_bind(now); + query_builder.push(" AND status = "); + query_builder.push_bind(InflightActivationStatus::Claimed.to_string()); + self.add_partition_condition(&mut query_builder, false); + + let released = query_builder.build().execute(&mut *conn).await?; + + Ok(released.rows_affected()) + }, + ) + .await } /// Update tasks that are in processing and have exceeded their processing deadline @@ -767,54 +838,61 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn handle_processing_deadline(&self) -> Result { - let now = Utc::now(); - let mut atomic = self.write_pool.begin().await?; - - // At-most-once tasks that fail their processing deadlines go directly to failure - // there are no retries, as the worker will reject the task due to at_most_once keys. - let mut query_builder = QueryBuilder::new( - "UPDATE inflight_taskactivations - SET processing_deadline = null, status = ", - ); - query_builder.push_bind(InflightActivationStatus::Failure.to_string()); - query_builder.push(" WHERE processing_deadline < "); - query_builder.push_bind(now); - query_builder.push(" AND at_most_once = TRUE AND status = "); - query_builder.push_bind(InflightActivationStatus::Processing.to_string()); - - self.add_partition_condition(&mut query_builder, false); - - let most_once_result = query_builder.build().execute(&mut *atomic).await; - - let mut processing_deadline_modified_rows = 0; - if let Ok(query_res) = most_once_result { - processing_deadline_modified_rows = query_res.rows_affected(); - } - - // Update regular tasks. - // Increment processing_attempts by 1 and reset processing_deadline to null. - let mut query_builder = QueryBuilder::new( - "UPDATE inflight_taskactivations - SET processing_deadline = null, status = ", - ); - query_builder.push_bind(InflightActivationStatus::Pending.to_string()); - query_builder.push(", processing_attempts = processing_attempts + 1"); - query_builder.push(" WHERE processing_deadline < "); - query_builder.push_bind(now); - query_builder.push(" AND status = "); - query_builder.push_bind(InflightActivationStatus::Processing.to_string()); - self.add_partition_condition(&mut query_builder, false); - - let result = query_builder.build().execute(&mut *atomic).await; - - atomic.commit().await?; + retry_query( + &self.config.retry_config, + "handle_processing_deadline", + || async { + let now = Utc::now(); + let mut atomic = self.write_pool.begin().await?; + + // At-most-once tasks that fail their processing deadlines go directly to failure + // there are no retries, as the worker will reject the task due to at_most_once keys. + let mut query_builder = QueryBuilder::new( + "UPDATE inflight_taskactivations + SET processing_deadline = null, status = ", + ); + query_builder.push_bind(InflightActivationStatus::Failure.to_string()); + query_builder.push(" WHERE processing_deadline < "); + query_builder.push_bind(now); + query_builder.push(" AND at_most_once = TRUE AND status = "); + query_builder.push_bind(InflightActivationStatus::Processing.to_string()); + + self.add_partition_condition(&mut query_builder, false); + + let most_once_result = query_builder.build().execute(&mut *atomic).await; + + let mut processing_deadline_modified_rows = 0; + if let Ok(query_res) = most_once_result { + processing_deadline_modified_rows = query_res.rows_affected(); + } - if let Ok(query_res) = result { - processing_deadline_modified_rows += query_res.rows_affected(); - return Ok(processing_deadline_modified_rows); - } + // Update regular tasks. + // Increment processing_attempts by 1 and reset processing_deadline to null. + let mut query_builder = QueryBuilder::new( + "UPDATE inflight_taskactivations + SET processing_deadline = null, status = ", + ); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push(", processing_attempts = processing_attempts + 1"); + query_builder.push(" WHERE processing_deadline < "); + query_builder.push_bind(now); + query_builder.push(" AND status = "); + query_builder.push_bind(InflightActivationStatus::Processing.to_string()); + self.add_partition_condition(&mut query_builder, false); + + let result = query_builder.build().execute(&mut *atomic).await; + + atomic.commit().await?; + + if let Ok(query_res) = result { + processing_deadline_modified_rows += query_res.rows_affected(); + return Ok(processing_deadline_modified_rows); + } - Err(anyhow!("Could not update tasks past processing_deadline")) + Err(anyhow!("Could not update tasks past processing_deadline")) + }, + ) + .await } /// Update tasks that have exceeded their max processing attempts. @@ -822,26 +900,33 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn handle_processing_attempts(&self) -> Result { - let mut conn = self - .acquire_write_conn_metric("handle_processing_attempts") - .await?; - let mut query_builder = QueryBuilder::new( - "UPDATE inflight_taskactivations - SET status = ", - ); - query_builder.push_bind(InflightActivationStatus::Failure.to_string()); - query_builder.push(" WHERE processing_attempts >= "); - query_builder.push_bind(self.config.max_processing_attempts as i32); - query_builder.push(" AND status = "); - query_builder.push_bind(InflightActivationStatus::Pending.to_string()); - self.add_partition_condition(&mut query_builder, false); - let processing_attempts_result = query_builder.build().execute(&mut *conn).await; - - if let Ok(query_res) = processing_attempts_result { - return Ok(query_res.rows_affected()); - } + retry_query( + &self.config.retry_config, + "handle_processing_attempts", + || async { + let mut conn = self + .acquire_write_conn_metric("handle_processing_attempts") + .await?; + let mut query_builder = QueryBuilder::new( + "UPDATE inflight_taskactivations + SET status = ", + ); + query_builder.push_bind(InflightActivationStatus::Failure.to_string()); + query_builder.push(" WHERE processing_attempts >= "); + query_builder.push_bind(self.config.max_processing_attempts as i32); + query_builder.push(" AND status = "); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + self.add_partition_condition(&mut query_builder, false); + let processing_attempts_result = query_builder.build().execute(&mut *conn).await; + + if let Ok(query_res) = processing_attempts_result { + return Ok(query_res.rows_affected()); + } - Err(anyhow!("Could not update tasks past processing_deadline")) + Err(anyhow!("Could not update tasks past processing_deadline")) + }, + ) + .await } /// Perform upkeep work for tasks that are past expires_at deadlines @@ -853,17 +938,20 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn handle_expires_at(&self) -> Result { - let now = Utc::now(); - let mut conn = self.acquire_write_conn_metric("handle_expires_at").await?; - let mut query_builder = - QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE status = "); - query_builder.push_bind(InflightActivationStatus::Pending.to_string()); - query_builder.push(" AND expires_at IS NOT NULL AND expires_at < "); - query_builder.push_bind(now); - self.add_partition_condition(&mut query_builder, false); - let result = query_builder.build().execute(&mut *conn).await?; - - Ok(result.rows_affected()) + retry_query(&self.config.retry_config, "handle_expires_at", || async { + let now = Utc::now(); + let mut conn = self.acquire_write_conn_metric("handle_expires_at").await?; + let mut query_builder = + QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE status = "); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push(" AND expires_at IS NOT NULL AND expires_at < "); + query_builder.push_bind(now); + self.add_partition_condition(&mut query_builder, false); + let result = query_builder.build().execute(&mut *conn).await?; + + Ok(result.rows_affected()) + }) + .await } /// Perform upkeep work for tasks that are past delay_until deadlines @@ -875,22 +963,25 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn handle_delay_until(&self) -> Result { - let now = Utc::now(); - let mut conn = self.acquire_write_conn_metric("handle_delay_until").await?; + retry_query(&self.config.retry_config, "handle_delay_until", || async { + let now = Utc::now(); + let mut conn = self.acquire_write_conn_metric("handle_delay_until").await?; - let mut query_builder = QueryBuilder::new( - "UPDATE inflight_taskactivations - SET status = ", - ); - query_builder.push_bind(InflightActivationStatus::Pending.to_string()); - query_builder.push(" WHERE delay_until IS NOT NULL AND delay_until < "); - query_builder.push_bind(now); - query_builder.push(" AND status = "); - query_builder.push_bind(InflightActivationStatus::Delay.to_string()); - self.add_partition_condition(&mut query_builder, false); - let update_result = query_builder.build().execute(&mut *conn).await?; - - Ok(update_result.rows_affected()) + let mut query_builder = QueryBuilder::new( + "UPDATE inflight_taskactivations + SET status = ", + ); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push(" WHERE delay_until IS NOT NULL AND delay_until < "); + query_builder.push_bind(now); + query_builder.push(" AND status = "); + query_builder.push_bind(InflightActivationStatus::Delay.to_string()); + self.add_partition_condition(&mut query_builder, false); + let update_result = query_builder.build().execute(&mut *conn).await?; + + Ok(update_result.rows_affected()) + }) + .await } /// Perform upkeep work related to status=failure @@ -902,39 +993,72 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn handle_failed_tasks(&self) -> Result { - let mut atomic = self.write_pool.begin().await?; + retry_query( + &self.config.retry_config, + "handle_failed_tasks", + || async { + let mut atomic = self.write_pool.begin().await?; + + let mut query_builder = QueryBuilder::new( + "SELECT id, activation, on_attempts_exceeded FROM inflight_taskactivations WHERE status = ", + ); + query_builder.push_bind(InflightActivationStatus::Failure.to_string()); + self.add_partition_condition(&mut query_builder, false); + let failed_tasks = query_builder + .build_query_as::<(String, Vec, i32)>() + .fetch_all(&mut *atomic) + .await?; - let mut query_builder = QueryBuilder::new( - "SELECT id, activation, on_attempts_exceeded FROM inflight_taskactivations WHERE status = ", - ); - query_builder.push_bind(InflightActivationStatus::Failure.to_string()); - self.add_partition_condition(&mut query_builder, false); - let failed_tasks = query_builder - .build_query_as::<(String, Vec, i32)>() - .fetch_all(&mut *atomic) - .await?; + let mut forwarder = FailedTasksForwarder { + to_discard: vec![], + to_deadletter: vec![], + }; + + for record in failed_tasks.iter() { + let activation_data: &[u8] = record.1.as_slice(); + let id: String = record.0.clone(); + // We could be deadlettering because of activation.expires + // when a task expires we still deadletter if configured. + let on_attempts_exceeded: OnAttemptsExceeded = record.2.try_into().unwrap(); + if on_attempts_exceeded == OnAttemptsExceeded::Discard + || on_attempts_exceeded == OnAttemptsExceeded::Unspecified + { + forwarder.to_discard.push((id, activation_data.to_vec())) + } else if on_attempts_exceeded == OnAttemptsExceeded::Deadletter { + forwarder.to_deadletter.push((id, activation_data.to_vec())) + } + } - let mut forwarder = FailedTasksForwarder { - to_discard: vec![], - to_deadletter: vec![], - }; - - for record in failed_tasks.iter() { - let activation_data: &[u8] = record.1.as_slice(); - let id: String = record.0.clone(); - // We could be deadlettering because of activation.expires - // when a task expires we still deadletter if configured. - let on_attempts_exceeded: OnAttemptsExceeded = record.2.try_into().unwrap(); - if on_attempts_exceeded == OnAttemptsExceeded::Discard - || on_attempts_exceeded == OnAttemptsExceeded::Unspecified - { - forwarder.to_discard.push((id, activation_data.to_vec())) - } else if on_attempts_exceeded == OnAttemptsExceeded::Deadletter { - forwarder.to_deadletter.push((id, activation_data.to_vec())) - } - } + if !forwarder.to_discard.is_empty() { + let mut query_builder = + QueryBuilder::new("UPDATE inflight_taskactivations "); + query_builder + .push("SET status = ") + .push_bind(InflightActivationStatus::Complete.to_string()) + .push(" WHERE id IN ("); + + let mut separated = query_builder.separated(", "); + for (id, _body) in forwarder.to_discard.iter() { + separated.push_bind(id); + } + separated.push_unseparated(")"); + + query_builder.build().execute(&mut *atomic).await?; + } - if !forwarder.to_discard.is_empty() { + atomic.commit().await?; + + Ok(forwarder) + }, + ) + .await + } + + /// Mark a collection of tasks as complete by id + #[instrument(skip_all)] + #[framed] + async fn mark_completed(&self, ids: Vec) -> Result { + retry_query(&self.config.retry_config, "mark_completed", || async { let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); query_builder .push("SET status = ") @@ -942,38 +1066,16 @@ impl InflightActivationStore for PostgresActivationStore { .push(" WHERE id IN ("); let mut separated = query_builder.separated(", "); - for (id, _body) in forwarder.to_discard.iter() { + for id in ids.iter() { separated.push_bind(id); } separated.push_unseparated(")"); + let mut conn = self.acquire_write_conn_metric("mark_completed").await?; + let result = query_builder.build().execute(&mut *conn).await?; - query_builder.build().execute(&mut *atomic).await?; - } - - atomic.commit().await?; - - Ok(forwarder) - } - - /// Mark a collection of tasks as complete by id - #[instrument(skip_all)] - #[framed] - async fn mark_completed(&self, ids: Vec) -> Result { - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); - query_builder - .push("SET status = ") - .push_bind(InflightActivationStatus::Complete.to_string()) - .push(" WHERE id IN ("); - - let mut separated = query_builder.separated(", "); - for id in ids.iter() { - separated.push_bind(id); - } - separated.push_unseparated(")"); - let mut conn = self.acquire_write_conn_metric("mark_completed").await?; - let result = query_builder.build().execute(&mut *conn).await?; - - Ok(result.rows_affected()) + Ok(result.rows_affected()) + }) + .await } /// Remove completed tasks. @@ -981,34 +1083,40 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn remove_completed(&self) -> Result { - let mut conn = self.acquire_write_conn_metric("remove_completed").await?; - let mut query_builder = - QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE status = "); - query_builder.push_bind(InflightActivationStatus::Complete.to_string()); - self.add_partition_condition(&mut query_builder, false); - let result = query_builder.build().execute(&mut *conn).await?; - - Ok(result.rows_affected()) + retry_query(&self.config.retry_config, "remove_completed", || async { + let mut conn = self.acquire_write_conn_metric("remove_completed").await?; + let mut query_builder = + QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE status = "); + query_builder.push_bind(InflightActivationStatus::Complete.to_string()); + self.add_partition_condition(&mut query_builder, false); + let result = query_builder.build().execute(&mut *conn).await?; + + Ok(result.rows_affected()) + }) + .await } /// Remove killswitched tasks. #[instrument(skip_all)] #[framed] async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { - let mut query_builder = - QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); - let mut separated = query_builder.separated(", "); - for taskname in killswitched_tasks.iter() { - separated.push_bind(taskname); - } - separated.push_unseparated(")"); - self.add_partition_condition(&mut query_builder, false); - let mut conn = self - .acquire_write_conn_metric("remove_killswitched") - .await?; - let query = query_builder.build().execute(&mut *conn).await?; + retry_query(&self.config.retry_config, "remove_killswitched", || async { + let mut query_builder = + QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); + let mut separated = query_builder.separated(", "); + for taskname in killswitched_tasks.iter() { + separated.push_bind(taskname); + } + separated.push_unseparated(")"); + self.add_partition_condition(&mut query_builder, false); + let mut conn = self + .acquire_write_conn_metric("remove_killswitched") + .await?; + let query = query_builder.build().execute(&mut *conn).await?; - Ok(query.rows_affected()) + Ok(query.rows_affected()) + }) + .await } // Used in tests diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 8692ac0c..aeb24abf 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -25,10 +25,11 @@ use tracing::{instrument, warn}; use crate::config::Config; use crate::store::activation::{InflightActivation, InflightActivationStatus}; +use crate::store::retry::{RetryConfig, retry_query}; use crate::store::traits::InflightActivationStore; use crate::store::types::{BucketRange, FailedTasksForwarder}; -#[derive(Debug, FromRow)] +#[derive(Debug, Clone, FromRow)] pub struct TableRow { pub id: String, pub activation: Vec, @@ -143,6 +144,7 @@ pub struct InflightActivationStoreConfig { pub claim_lease_ms: u64, pub vacuum_page_count: Option, pub enable_sqlite_status_metrics: bool, + pub retry_config: RetryConfig, } impl InflightActivationStoreConfig { @@ -153,6 +155,7 @@ impl InflightActivationStoreConfig { processing_deadline_grace_sec: config.processing_deadline_grace_sec, claim_lease_ms: config.fetch_batch_size.max(1) as u64 * config.push_queue_timeout_ms, enable_sqlite_status_metrics: config.enable_sqlite_status_metrics, + retry_config: RetryConfig::from_config(config), } } } @@ -343,101 +346,64 @@ impl InflightActivationStore for SqliteActivationStore { async fn vacuum_db(&self) -> Result<(), Error> { let timer = Instant::now(); - if let Some(page_count) = self.config.vacuum_page_count { - let mut conn = self.acquire_write_conn_metric("vacuum_db").await?; - sqlx::query(format!("PRAGMA incremental_vacuum({page_count})").as_str()) - .execute(&mut *conn) - .await?; - } else { - let mut conn = self.acquire_write_conn_metric("vacuum_db").await?; - sqlx::query("PRAGMA incremental_vacuum") - .execute(&mut *conn) - .await?; - } - let freelist_count: i32 = sqlx::query("PRAGMA freelist_count") - .fetch_one(&self.read_pool) - .await? - .get("freelist_count"); + retry_query(&self.config.retry_config, "vacuum_db", || async { + if let Some(page_count) = self.config.vacuum_page_count { + let mut conn = self.acquire_write_conn_metric("vacuum_db").await?; + sqlx::query(format!("PRAGMA incremental_vacuum({page_count})").as_str()) + .execute(&mut *conn) + .await?; + } else { + let mut conn = self.acquire_write_conn_metric("vacuum_db").await?; + sqlx::query("PRAGMA incremental_vacuum") + .execute(&mut *conn) + .await?; + } + let freelist_count: i32 = sqlx::query("PRAGMA freelist_count") + .fetch_one(&self.read_pool) + .await? + .get("freelist_count"); + + metrics::gauge!("store.vacuum.freelist", "database" => "meta").set(freelist_count); + Ok(()) + }) + .await?; metrics::histogram!("store.vacuum", "database" => "meta").record(timer.elapsed()); - metrics::gauge!("store.vacuum.freelist", "database" => "meta").set(freelist_count); Ok(()) } /// Perform a full vacuum on the database. async fn full_vacuum_db(&self) -> Result<(), Error> { - let mut conn = self.acquire_write_conn_metric("full_vacuum_db").await?; - sqlx::query("VACUUM").execute(&mut *conn).await?; + retry_query(&self.config.retry_config, "full_vacuum_db", || async { + let mut conn = self.acquire_write_conn_metric("full_vacuum_db").await?; + sqlx::query("VACUUM").execute(&mut *conn).await?; + Ok(()) + }) + .await?; self.emit_db_status_metrics().await; Ok(()) } /// Get the size of the database in bytes based on SQLite metadata queries. async fn db_size(&self) -> Result { - let result: u64 = sqlx::query( - "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()", - ) - .fetch_one(&self.read_pool) - .await? - .get(0); - - Ok(result) + retry_query(&self.config.retry_config, "db_size", || async { + let result: u64 = sqlx::query( + "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()", + ) + .fetch_one(&self.read_pool) + .await? + .get(0); + Ok(result) + }) + .await } /// Get an activation by id. Primarily used for testing async fn get_by_id(&self, id: &str) -> Result, Error> { - let row_result: Option = sqlx::query_as( - " - SELECT id, - activation, - partition, - offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - claim_expires_at, - status, - at_most_once, - application, - namespace, - taskname, - on_attempts_exceeded, - bucket - FROM inflight_taskactivations - WHERE id = $1 - ", - ) - .bind(id) - .fetch_optional(&self.read_pool) - .await?; - - let Some(row) = row_result else { - return Ok(None); - }; - - Ok(Some(row.into())) - } - - fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { - warn!("assign_partitions: {:?}", partitions); - Ok(()) - } - - #[instrument(skip_all)] - async fn store(&self, batch: Vec) -> Result { - if batch.is_empty() { - return Ok(0); - } - - let mut query_builder = QueryBuilder::::new( - " - INSERT INTO inflight_taskactivations - ( - id, + retry_query(&self.config.retry_config, "get_by_id", || async { + let row_result: Option = sqlx::query_as( + " + SELECT id, activation, partition, offset, @@ -456,73 +422,127 @@ impl InflightActivationStore for SqliteActivationStore { taskname, on_attempts_exceeded, bucket - ) - ", - ); + FROM inflight_taskactivations + WHERE id = $1 + ", + ) + .bind(id) + .fetch_optional(&self.read_pool) + .await?; + + let Some(row) = row_result else { + return Ok(None); + }; + + Ok(Some(row.into())) + }) + .await + } + + fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { + warn!("assign_partitions: {:?}", partitions); + Ok(()) + } + + #[instrument(skip_all)] + async fn store(&self, batch: Vec) -> Result { + if batch.is_empty() { + return Ok(0); + } + let rows = batch .into_iter() .map(TableRow::try_from) .collect::, _>>()?; - let query = query_builder - .push_values(rows, |mut b, row| { - b.push_bind(row.id); - b.push_bind(row.activation); - b.push_bind(row.partition); - b.push_bind(row.offset); - b.push_bind(row.added_at.timestamp()); - b.push_bind(row.received_at.timestamp()); - b.push_bind(row.processing_attempts); - b.push_bind(row.expires_at.map(|t| Some(t.timestamp()))); - b.push_bind(row.delay_until.map(|t| Some(t.timestamp()))); - b.push_bind(row.processing_deadline_duration); - - if let Some(deadline) = row.processing_deadline { - b.push_bind(deadline.timestamp()); - } else { - b.push("null"); + retry_query(&self.config.retry_config, "store", || async { + let mut query_builder = QueryBuilder::::new( + " + INSERT INTO inflight_taskactivations + ( + id, + activation, + partition, + offset, + added_at, + received_at, + processing_attempts, + expires_at, + delay_until, + processing_deadline_duration, + processing_deadline, + claim_expires_at, + status, + at_most_once, + application, + namespace, + taskname, + on_attempts_exceeded, + bucket + ) + ", + ); + let query = query_builder + .push_values(rows.clone(), |mut b, row: TableRow| { + b.push_bind(row.id); + b.push_bind(row.activation); + b.push_bind(row.partition); + b.push_bind(row.offset); + b.push_bind(row.added_at.timestamp()); + b.push_bind(row.received_at.timestamp()); + b.push_bind(row.processing_attempts); + b.push_bind(row.expires_at.map(|t| Some(t.timestamp()))); + b.push_bind(row.delay_until.map(|t| Some(t.timestamp()))); + b.push_bind(row.processing_deadline_duration); + + if let Some(deadline) = row.processing_deadline { + b.push_bind(deadline.timestamp()); + } else { + b.push("null"); + } + + if let Some(exp) = row.claim_expires_at { + b.push_bind(exp.timestamp()); + } else { + b.push("null"); + } + + b.push_bind(row.status); + b.push_bind(row.at_most_once); + b.push_bind(row.application); + b.push_bind(row.namespace); + b.push_bind(row.taskname); + b.push_bind(row.on_attempts_exceeded as i32); + b.push_bind(row.bucket); + }) + .push(" ON CONFLICT(id) DO NOTHING") + .build(); + let mut conn = self.acquire_write_conn_metric("store").await?; + let result = query.execute(&mut *conn).await?; + + // Sync the WAL into the main database so we don't lose data on host failure. + // Best-effort — errors are swallowed and only recorded as metrics. + let checkpoint_timer = Instant::now(); + let checkpoint_result = sqlx::query("PRAGMA wal_checkpoint(PASSIVE)") + .fetch_one(&mut *conn) + .await; + match checkpoint_result { + Ok(row) => { + metrics::gauge!("store.passive_checkpoint_busy").set(row.get::("busy")); + metrics::gauge!("store.pages_written_to_wal").set(row.get::("log")); + metrics::gauge!("store.pages_committed_to_db") + .set(row.get::("checkpointed")); + metrics::gauge!("store.checkpoint.failed").set(0); } - - if let Some(exp) = row.claim_expires_at { - b.push_bind(exp.timestamp()); - } else { - b.push("null"); + Err(_e) => { + metrics::gauge!("store.checkpoint.failed").set(1); } - - b.push_bind(row.status); - b.push_bind(row.at_most_once); - b.push_bind(row.application); - b.push_bind(row.namespace); - b.push_bind(row.taskname); - b.push_bind(row.on_attempts_exceeded as i32); - b.push_bind(row.bucket); - }) - .push(" ON CONFLICT(id) DO NOTHING") - .build(); - let mut conn = self.acquire_write_conn_metric("store").await?; - let result = query.execute(&mut *conn).await?; - let rows_affected = Ok(result.rows_affected()); - - // Sync the WAL into the main database so we don't lose data on host failure. - let checkpoint_timer = Instant::now(); - let checkpoint_result = sqlx::query("PRAGMA wal_checkpoint(PASSIVE)") - .fetch_one(&mut *conn) - .await; - match checkpoint_result { - Ok(row) => { - metrics::gauge!("store.passive_checkpoint_busy").set(row.get::("busy")); - metrics::gauge!("store.pages_written_to_wal").set(row.get::("log")); - metrics::gauge!("store.pages_committed_to_db") - .set(row.get::("checkpointed")); - metrics::gauge!("store.checkpoint.failed").set(0); } - Err(_e) => { - metrics::gauge!("store.checkpoint.failed").set(1); - } - } - metrics::histogram!("store.checkpoint.duration").record(checkpoint_timer.elapsed()); + metrics::histogram!("store.checkpoint.duration").record(checkpoint_timer.elapsed()); - rows_affected + Ok(result.rows_affected()) + }) + .await } #[instrument(skip_all)] @@ -534,86 +554,97 @@ impl InflightActivationStore for SqliteActivationStore { bucket: Option, mark_processing: bool, ) -> Result, Error> { - let now = Utc::now(); - let grace_period = self.config.processing_deadline_grace_sec; + retry_query(&self.config.retry_config, "claim_activations", || async { + let now = Utc::now(); + let grace_period = self.config.processing_deadline_grace_sec; - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations SET "); + let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations SET "); - if mark_processing { - query_builder.push(format!( - "processing_deadline = unixepoch('now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'), claim_expires_at = NULL, status = " - )); + if mark_processing { + query_builder.push(format!( + "processing_deadline = unixepoch('now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'), claim_expires_at = NULL, status = " + )); - query_builder.push_bind(InflightActivationStatus::Processing); - } else { - query_builder.push(format!( - "claim_expires_at = unixepoch('now', '+' || {:.3} || ' seconds', '+' || {grace_period} || ' seconds'), processing_deadline = NULL, status = ", - self.config.claim_lease_ms as f64 / 1000.0, - )); + query_builder.push_bind(InflightActivationStatus::Processing); + } else { + query_builder.push(format!( + "claim_expires_at = unixepoch('now', '+' || {:.3} || ' seconds', '+' || {grace_period} || ' seconds'), processing_deadline = NULL, status = ", + self.config.claim_lease_ms as f64 / 1000.0, + )); - query_builder.push_bind(InflightActivationStatus::Claimed); - } + query_builder.push_bind(InflightActivationStatus::Claimed); + } - query_builder.push(" WHERE id IN (SELECT id FROM inflight_taskactivations WHERE status = "); - query_builder.push_bind(InflightActivationStatus::Pending); - query_builder.push(" AND (expires_at IS NULL OR expires_at > "); - query_builder.push_bind(now.timestamp()); - query_builder.push(")"); + query_builder + .push(" WHERE id IN (SELECT id FROM inflight_taskactivations WHERE status = "); + query_builder.push_bind(InflightActivationStatus::Pending); + query_builder.push(" AND (expires_at IS NULL OR expires_at > "); + query_builder.push_bind(now.timestamp()); + query_builder.push(")"); - if let Some(value) = application { - query_builder.push(" AND application ="); - query_builder.push_bind(value); - } - if let Some(namespaces) = namespaces - && !namespaces.is_empty() - { - query_builder.push(" AND namespace IN ("); - let mut separated = query_builder.separated(", "); - for namespace in namespaces.iter() { - separated.push_bind(namespace); + if let Some(value) = application { + query_builder.push(" AND application ="); + query_builder.push_bind(value); } - query_builder.push(")"); - } - if let Some((min, max)) = bucket { - query_builder.push(" AND bucket >= "); - query_builder.push_bind(min); - query_builder.push(" AND bucket <= "); - query_builder.push_bind(max); - } - query_builder.push(" ORDER BY added_at"); - if let Some(limit) = limit { - query_builder.push(" LIMIT "); - query_builder.push_bind(limit); - } - query_builder.push(") RETURNING *"); + if let Some(namespaces) = namespaces + && !namespaces.is_empty() + { + query_builder.push(" AND namespace IN ("); + let mut separated = query_builder.separated(", "); + for namespace in namespaces.iter() { + separated.push_bind(namespace); + } + query_builder.push(")"); + } + if let Some((min, max)) = bucket { + query_builder.push(" AND bucket >= "); + query_builder.push_bind(min); + query_builder.push(" AND bucket <= "); + query_builder.push_bind(max); + } + query_builder.push(" ORDER BY added_at"); + if let Some(limit) = limit { + query_builder.push(" LIMIT "); + query_builder.push_bind(limit); + } + query_builder.push(") RETURNING *"); - let mut conn = self.acquire_write_conn_metric("claim_activations").await?; - let rows: Vec = query_builder - .build_query_as::() - .fetch_all(&mut *conn) - .await?; + let mut conn = self.acquire_write_conn_metric("claim_activations").await?; + let rows: Vec = query_builder + .build_query_as::() + .fetch_all(&mut *conn) + .await?; - Ok(rows.into_iter().map(|row| row.into()).collect()) + Ok(rows.into_iter().map(|row| row.into()).collect()) + }) + .await } #[instrument(skip_all)] async fn mark_activation_processing(&self, id: &str) -> Result<(), Error> { - let mut conn = self - .acquire_write_conn_metric("mark_activation_processing") - .await?; - let grace_period = self.config.processing_deadline_grace_sec; - let result = sqlx::query(&format!( - "UPDATE inflight_taskactivations SET - status = $1, - processing_deadline = unixepoch('now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'), - claim_expires_at = NULL - WHERE id = $2 AND status = $3", - )) - .bind(InflightActivationStatus::Processing) - .bind(id) - .bind(InflightActivationStatus::Claimed) - .execute(&mut *conn) + let result = retry_query( + &self.config.retry_config, + "mark_activation_processing", + || async { + let mut conn = self + .acquire_write_conn_metric("mark_activation_processing") + .await?; + let result = sqlx::query(&format!( + "UPDATE inflight_taskactivations SET + status = $1, + processing_deadline = unixepoch('now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'), + claim_expires_at = NULL + WHERE id = $2 AND status = $3", + )) + .bind(InflightActivationStatus::Processing) + .bind(id) + .bind(InflightActivationStatus::Claimed) + .execute(&mut *conn) + .await?; + Ok(result) + }, + ) .await?; if result.rows_affected() == 0 { @@ -668,19 +699,26 @@ impl InflightActivationStore for SqliteActivationStore { #[instrument(skip_all)] async fn count_by_status(&self, status: InflightActivationStatus) -> Result { - let result = - sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = $1") - .bind(status) - .fetch_one(&self.read_pool) - .await?; - Ok(result.get::("count") as usize) + retry_query(&self.config.retry_config, "count_by_status", || async { + let result = sqlx::query( + "SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = $1", + ) + .bind(status) + .fetch_one(&self.read_pool) + .await?; + Ok(result.get::("count") as usize) + }) + .await } async fn count(&self) -> Result { - let result = sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations") - .fetch_one(&self.read_pool) - .await?; - Ok(result.get::("count") as usize) + retry_query(&self.config.retry_config, "count", || async { + let result = sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations") + .fetch_one(&self.read_pool) + .await?; + Ok(result.get::("count") as usize) + }) + .await } /// Update the status of a specific activation @@ -690,20 +728,23 @@ impl InflightActivationStore for SqliteActivationStore { id: &str, status: InflightActivationStatus, ) -> Result, Error> { - let mut conn = self.acquire_write_conn_metric("set_status").await?; - let result: Option = sqlx::query_as( - "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *", - ) - .bind(status) - .bind(id) - .fetch_optional(&mut *conn) - .await?; + retry_query(&self.config.retry_config, "set_status", || async { + let mut conn = self.acquire_write_conn_metric("set_status").await?; + let result: Option = sqlx::query_as( + "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *", + ) + .bind(status) + .bind(id) + .fetch_optional(&mut *conn) + .await?; - let Some(row) = result else { - return Ok(None); - }; + let Some(row) = result else { + return Ok(None); + }; - Ok(Some(row.into())) + Ok(Some(row.into())) + }) + .await } #[instrument(skip_all)] @@ -712,93 +753,122 @@ impl InflightActivationStore for SqliteActivationStore { id: &str, deadline: Option>, ) -> Result<(), Error> { - let mut conn = self - .acquire_write_conn_metric("set_processing_deadline") - .await?; - sqlx::query("UPDATE inflight_taskactivations SET processing_deadline = $1 WHERE id = $2") - .bind(deadline.unwrap().timestamp()) - .bind(id) - .execute(&mut *conn) - .await?; - Ok(()) + retry_query( + &self.config.retry_config, + "set_processing_deadline", + || async { + let mut conn = self + .acquire_write_conn_metric("set_processing_deadline") + .await?; + sqlx::query( + "UPDATE inflight_taskactivations SET processing_deadline = $1 WHERE id = $2", + ) + .bind(deadline.unwrap().timestamp()) + .bind(id) + .execute(&mut *conn) + .await?; + Ok(()) + }, + ) + .await } #[instrument(skip_all)] async fn delete_activation(&self, id: &str) -> Result<(), Error> { - let mut conn = self.acquire_write_conn_metric("delete_activation").await?; - sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1") - .bind(id) - .execute(&mut *conn) - .await?; - Ok(()) + retry_query(&self.config.retry_config, "delete_activation", || async { + let mut conn = self.acquire_write_conn_metric("delete_activation").await?; + sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1") + .bind(id) + .execute(&mut *conn) + .await?; + Ok(()) + }) + .await } #[instrument(skip_all)] async fn get_retry_activations(&self) -> Result, Error> { - Ok(sqlx::query_as( - " - SELECT id, - activation, - partition, - offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - claim_expires_at, - status, - at_most_once, - application, - namespace, - taskname, - on_attempts_exceeded, - bucket - FROM inflight_taskactivations - WHERE status = $1 - ", + retry_query( + &self.config.retry_config, + "get_retry_activations", + || async { + Ok(sqlx::query_as( + " + SELECT id, + activation, + partition, + offset, + added_at, + received_at, + processing_attempts, + expires_at, + delay_until, + processing_deadline_duration, + processing_deadline, + claim_expires_at, + status, + at_most_once, + application, + namespace, + taskname, + on_attempts_exceeded, + bucket + FROM inflight_taskactivations + WHERE status = $1 + ", + ) + .bind(InflightActivationStatus::Retry) + .fetch_all(&self.read_pool) + .await? + .into_iter() + .map(|row: TableRow| row.into()) + .collect()) + }, ) - .bind(InflightActivationStatus::Retry) - .fetch_all(&self.read_pool) - .await? - .into_iter() - .map(|row: TableRow| row.into()) - .collect()) + .await } async fn clear(&self) -> Result<(), Error> { - let mut conn = self.acquire_write_conn_metric("clear").await?; - sqlx::query("DELETE FROM inflight_taskactivations") - .execute(&mut *conn) - .await?; - Ok(()) + retry_query(&self.config.retry_config, "clear", || async { + let mut conn = self.acquire_write_conn_metric("clear").await?; + sqlx::query("DELETE FROM inflight_taskactivations") + .execute(&mut *conn) + .await?; + Ok(()) + }) + .await } /// Expired push claims (`Claimed` + past `claim_expires_at`). #[instrument(skip_all)] async fn handle_claim_expiration(&self) -> Result { - let now = Utc::now(); - let mut conn = self - .acquire_write_conn_metric("handle_claim_expiration") - .await?; + retry_query( + &self.config.retry_config, + "handle_claim_expiration", + || async { + let now = Utc::now(); + let mut conn = self + .acquire_write_conn_metric("handle_claim_expiration") + .await?; + + let released = sqlx::query( + "UPDATE inflight_taskactivations + SET claim_expires_at = null, + status = $1 + WHERE claim_expires_at IS NOT NULL + AND claim_expires_at < $2 + AND status = $3", + ) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()) + .bind(InflightActivationStatus::Claimed) + .execute(&mut *conn) + .await?; - let released = sqlx::query( - "UPDATE inflight_taskactivations - SET claim_expires_at = null, - status = $1 - WHERE claim_expires_at IS NOT NULL - AND claim_expires_at < $2 - AND status = $3", + Ok(released.rows_affected()) + }, ) - .bind(InflightActivationStatus::Pending) - .bind(now.timestamp()) - .bind(InflightActivationStatus::Claimed) - .execute(&mut *conn) - .await?; - - Ok(released.rows_affected()) + .await } /// Update tasks that are in processing and have exceeded their processing deadline @@ -806,73 +876,83 @@ impl InflightActivationStore for SqliteActivationStore { /// if a worker took the task and was killed, or failed. #[instrument(skip_all)] async fn handle_processing_deadline(&self) -> Result { - let now = Utc::now(); - let mut atomic = self.write_pool.begin().await?; - - // Idempotent tasks that fail their processing deadlines go directly to failure - // there are no retries, as the worker will reject the task due to idempotency keys. - let most_once_result = sqlx::query( - "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1 - WHERE processing_deadline < $2 AND at_most_once = TRUE AND status = $3", - ) - .bind(InflightActivationStatus::Failure) - .bind(now.timestamp()) - .bind(InflightActivationStatus::Processing) - .execute(&mut *atomic) - .await; - - let mut processing_deadline_modified_rows = 0; - if let Ok(query_res) = most_once_result { - processing_deadline_modified_rows = query_res.rows_affected(); - } + retry_query( + &self.config.retry_config, + "handle_processing_deadline", + || async { + let now = Utc::now(); + let mut atomic = self.write_pool.begin().await?; + + let most_once_result = sqlx::query( + "UPDATE inflight_taskactivations + SET processing_deadline = null, status = $1 + WHERE processing_deadline < $2 AND at_most_once = TRUE AND status = $3", + ) + .bind(InflightActivationStatus::Failure) + .bind(now.timestamp()) + .bind(InflightActivationStatus::Processing) + .execute(&mut *atomic) + .await; + + let mut processing_deadline_modified_rows = 0; + if let Ok(query_res) = most_once_result { + processing_deadline_modified_rows = query_res.rows_affected(); + } - // Update non-idempotent tasks. - // Increment processing_attempts by 1 and reset processing_deadline to null. - let result = sqlx::query( - "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1 - WHERE processing_deadline < $2 AND status = $3", - ) - .bind(InflightActivationStatus::Pending) - .bind(now.timestamp()) - .bind(InflightActivationStatus::Processing) - .execute(&mut *atomic) - .await; + let result = sqlx::query( + "UPDATE inflight_taskactivations + SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1 + WHERE processing_deadline < $2 AND status = $3", + ) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()) + .bind(InflightActivationStatus::Processing) + .execute(&mut *atomic) + .await; - atomic.commit().await?; + atomic.commit().await?; - if let Ok(query_res) = result { - processing_deadline_modified_rows += query_res.rows_affected(); - return Ok(processing_deadline_modified_rows); - } + if let Ok(query_res) = result { + processing_deadline_modified_rows += query_res.rows_affected(); + return Ok(processing_deadline_modified_rows); + } - Err(anyhow!("Could not update tasks past processing_deadline")) + Err(anyhow!("Could not update tasks past processing_deadline")) + }, + ) + .await } /// Update tasks that have exceeded their max processing attempts. /// These tasks are set to status=failure and will be handled by handle_failed_tasks accordingly. #[instrument(skip_all)] async fn handle_processing_attempts(&self) -> Result { - let mut conn = self - .acquire_write_conn_metric("handle_processing_attempts") - .await?; - let processing_attempts_result = sqlx::query( - "UPDATE inflight_taskactivations - SET status = $1 - WHERE processing_attempts >= $2 AND status = $3", - ) - .bind(InflightActivationStatus::Failure) - .bind(self.config.max_processing_attempts as i32) - .bind(InflightActivationStatus::Pending) - .execute(&mut *conn) - .await; + retry_query( + &self.config.retry_config, + "handle_processing_attempts", + || async { + let mut conn = self + .acquire_write_conn_metric("handle_processing_attempts") + .await?; + let processing_attempts_result = sqlx::query( + "UPDATE inflight_taskactivations + SET status = $1 + WHERE processing_attempts >= $2 AND status = $3", + ) + .bind(InflightActivationStatus::Failure) + .bind(self.config.max_processing_attempts as i32) + .bind(InflightActivationStatus::Pending) + .execute(&mut *conn) + .await; - if let Ok(query_res) = processing_attempts_result { - return Ok(query_res.rows_affected()); - } + if let Ok(query_res) = processing_attempts_result { + return Ok(query_res.rows_affected()); + } - Err(anyhow!("Could not update tasks past processing_deadline")) + Err(anyhow!("Could not update tasks past processing_deadline")) + }, + ) + .await } /// Perform upkeep work for tasks that are past expires_at deadlines @@ -883,17 +963,20 @@ impl InflightActivationStore for SqliteActivationStore { /// The number of impacted records is returned in a Result. #[instrument(skip_all)] async fn handle_expires_at(&self) -> Result { - let now = Utc::now(); - let mut conn = self.acquire_write_conn_metric("handle_expires_at").await?; - let query = sqlx::query( - "DELETE FROM inflight_taskactivations WHERE status = $1 AND expires_at IS NOT NULL AND expires_at < $2", - ) - .bind(InflightActivationStatus::Pending) - .bind(now.timestamp()) - .execute(&mut *conn) - .await?; + retry_query(&self.config.retry_config, "handle_expires_at", || async { + let now = Utc::now(); + let mut conn = self.acquire_write_conn_metric("handle_expires_at").await?; + let query = sqlx::query( + "DELETE FROM inflight_taskactivations WHERE status = $1 AND expires_at IS NOT NULL AND expires_at < $2", + ) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()) + .execute(&mut *conn) + .await?; - Ok(query.rows_affected()) + Ok(query.rows_affected()) + }) + .await } /// Perform upkeep work for tasks that are past delay_until deadlines @@ -904,21 +987,24 @@ impl InflightActivationStore for SqliteActivationStore { /// The number of impacted records is returned in a Result. #[instrument(skip_all)] async fn handle_delay_until(&self) -> Result { - let now = Utc::now(); - let mut conn = self.acquire_write_conn_metric("handle_delay_until").await?; - let update_result = sqlx::query( - r#"UPDATE inflight_taskactivations - SET status = $1 - WHERE delay_until IS NOT NULL AND delay_until < $2 AND status = $3 - "#, - ) - .bind(InflightActivationStatus::Pending) - .bind(now.timestamp()) - .bind(InflightActivationStatus::Delay) - .execute(&mut *conn) - .await?; + retry_query(&self.config.retry_config, "handle_delay_until", || async { + let now = Utc::now(); + let mut conn = self.acquire_write_conn_metric("handle_delay_until").await?; + let update_result = sqlx::query( + r#"UPDATE inflight_taskactivations + SET status = $1 + WHERE delay_until IS NOT NULL AND delay_until < $2 AND status = $3 + "#, + ) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()) + .bind(InflightActivationStatus::Delay) + .execute(&mut *conn) + .await?; - Ok(update_result.rows_affected()) + Ok(update_result.rows_affected()) + }) + .await } /// Perform upkeep work related to status=failure @@ -929,39 +1015,69 @@ impl InflightActivationStore for SqliteActivationStore { /// complete. #[instrument(skip_all)] async fn handle_failed_tasks(&self) -> Result { - let mut atomic = self.write_pool.begin().await?; + retry_query( + &self.config.retry_config, + "handle_failed_tasks", + || async { + let mut atomic = self.write_pool.begin().await?; + + let failed_tasks: Vec = + sqlx::query("SELECT id, activation, on_attempts_exceeded FROM inflight_taskactivations WHERE status = $1") + .bind(InflightActivationStatus::Failure) + .fetch_all(&mut *atomic) + .await? + .into_iter() + .collect(); + + let mut forwarder = FailedTasksForwarder { + to_discard: vec![], + to_deadletter: vec![], + }; + + for record in failed_tasks.iter() { + let activation_data: &[u8] = record.get("activation"); + let id: String = record.get("id"); + let on_attempts_exceeded_val: i32 = record.get("on_attempts_exceeded"); + let on_attempts_exceeded: OnAttemptsExceeded = + on_attempts_exceeded_val.try_into().unwrap(); + if on_attempts_exceeded == OnAttemptsExceeded::Discard + || on_attempts_exceeded == OnAttemptsExceeded::Unspecified + { + forwarder.to_discard.push((id, activation_data.to_vec())) + } else if on_attempts_exceeded == OnAttemptsExceeded::Deadletter { + forwarder.to_deadletter.push((id, activation_data.to_vec())) + } + } - let failed_tasks: Vec = - sqlx::query("SELECT id, activation, on_attempts_exceeded FROM inflight_taskactivations WHERE status = $1") - .bind(InflightActivationStatus::Failure) - .fetch_all(&mut *atomic) - .await? - .into_iter() - .collect(); - - let mut forwarder = FailedTasksForwarder { - to_discard: vec![], - to_deadletter: vec![], - }; - - for record in failed_tasks.iter() { - let activation_data: &[u8] = record.get("activation"); - let id: String = record.get("id"); - // We could be deadlettering because of activation.expires - // when a task expires we still deadletter if configured. - let on_attempts_exceeded_val: i32 = record.get("on_attempts_exceeded"); - let on_attempts_exceeded: OnAttemptsExceeded = - on_attempts_exceeded_val.try_into().unwrap(); - if on_attempts_exceeded == OnAttemptsExceeded::Discard - || on_attempts_exceeded == OnAttemptsExceeded::Unspecified - { - forwarder.to_discard.push((id, activation_data.to_vec())) - } else if on_attempts_exceeded == OnAttemptsExceeded::Deadletter { - forwarder.to_deadletter.push((id, activation_data.to_vec())) - } - } + if !forwarder.to_discard.is_empty() { + let mut query_builder = + QueryBuilder::new("UPDATE inflight_taskactivations "); + query_builder + .push("SET status = ") + .push_bind(InflightActivationStatus::Complete) + .push(" WHERE id IN ("); + + let mut separated = query_builder.separated(", "); + for (id, _body) in forwarder.to_discard.iter() { + separated.push_bind(id); + } + separated.push_unseparated(")"); + + query_builder.build().execute(&mut *atomic).await?; + } + + atomic.commit().await?; + + Ok(forwarder) + }, + ) + .await + } - if !forwarder.to_discard.is_empty() { + /// Mark a collection of tasks as complete by id + #[instrument(skip_all)] + async fn mark_completed(&self, ids: Vec) -> Result { + retry_query(&self.config.retry_config, "mark_completed", || async { let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); query_builder .push("SET status = ") @@ -969,67 +1085,52 @@ impl InflightActivationStore for SqliteActivationStore { .push(" WHERE id IN ("); let mut separated = query_builder.separated(", "); - for (id, _body) in forwarder.to_discard.iter() { + for id in ids.iter() { separated.push_bind(id); } separated.push_unseparated(")"); + let mut conn = self.acquire_write_conn_metric("mark_completed").await?; + let result = query_builder.build().execute(&mut *conn).await?; - query_builder.build().execute(&mut *atomic).await?; - } - - atomic.commit().await?; - - Ok(forwarder) - } - - /// Mark a collection of tasks as complete by id - #[instrument(skip_all)] - async fn mark_completed(&self, ids: Vec) -> Result { - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); - query_builder - .push("SET status = ") - .push_bind(InflightActivationStatus::Complete) - .push(" WHERE id IN ("); - - let mut separated = query_builder.separated(", "); - for id in ids.iter() { - separated.push_bind(id); - } - separated.push_unseparated(")"); - let mut conn = self.acquire_write_conn_metric("mark_completed").await?; - let result = query_builder.build().execute(&mut *conn).await?; - - Ok(result.rows_affected()) + Ok(result.rows_affected()) + }) + .await } /// Remove completed tasks. /// This method is a garbage collector for the inflight task store. #[instrument(skip_all)] async fn remove_completed(&self) -> Result { - let mut conn = self.acquire_write_conn_metric("remove_completed").await?; - let query = sqlx::query("DELETE FROM inflight_taskactivations WHERE status = $1") - .bind(InflightActivationStatus::Complete) - .execute(&mut *conn) - .await?; + retry_query(&self.config.retry_config, "remove_completed", || async { + let mut conn = self.acquire_write_conn_metric("remove_completed").await?; + let query = sqlx::query("DELETE FROM inflight_taskactivations WHERE status = $1") + .bind(InflightActivationStatus::Complete) + .execute(&mut *conn) + .await?; - Ok(query.rows_affected()) + Ok(query.rows_affected()) + }) + .await } /// Remove killswitched tasks. #[instrument(skip_all)] async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { - let mut query_builder = - QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); - let mut separated = query_builder.separated(", "); - for taskname in killswitched_tasks.iter() { - separated.push_bind(taskname); - } - separated.push_unseparated(")"); - let mut conn = self - .acquire_write_conn_metric("remove_killswitched") - .await?; - let query = query_builder.build().execute(&mut *conn).await?; + retry_query(&self.config.retry_config, "remove_killswitched", || async { + let mut query_builder = + QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); + let mut separated = query_builder.separated(", "); + for taskname in killswitched_tasks.iter() { + separated.push_bind(taskname); + } + separated.push_unseparated(")"); + let mut conn = self + .acquire_write_conn_metric("remove_killswitched") + .await?; + let query = query_builder.build().execute(&mut *conn).await?; - Ok(query.rows_affected()) + Ok(query.rows_affected()) + }) + .await } } diff --git a/src/store/retry.rs b/src/store/retry.rs index 12c15a41..69410c19 100644 --- a/src/store/retry.rs +++ b/src/store/retry.rs @@ -1,15 +1,26 @@ -use std::sync::Arc; +use std::future::Future; use std::time::Duration; use anyhow::Error; -use async_trait::async_trait; -use chrono::{DateTime, Utc}; use tokio::time::sleep; use tracing::{info, warn}; -use crate::store::activation::{InflightActivation, InflightActivationStatus}; -use crate::store::traits::InflightActivationStore; -use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; +use crate::config::Config; + +/// Configuration for query-level retry behavior. +pub struct RetryConfig { + pub max_retries: u32, + pub retry_delay: Duration, +} + +impl RetryConfig { + pub fn from_config(config: &Config) -> Self { + Self { + max_retries: config.db_query_max_retries.unwrap_or(0), + retry_delay: Duration::from_millis(config.db_query_retry_delay_ms), + } + } +} /// Returns true if the error is a transient database/connection error /// that is likely to succeed on retry. Downcasts the anyhow::Error to @@ -24,490 +35,147 @@ fn is_retryable_error(err: &Error) -> bool { ) } -/// A wrapper around an `InflightActivationStore` that retries failed -/// database queries with a fixed delay between attempts. -pub struct RetryStore { - inner: Arc, - max_retries: u32, - retry_delay: Duration, -} - -impl RetryStore { - pub fn new( - inner: Arc, - max_retries: u32, - retry_delay_ms: u64, - ) -> Self { - Self { - inner, - max_retries, - retry_delay: Duration::from_millis(retry_delay_ms), - } - } -} - -/// Macro to reduce boilerplate for delegating trait methods with retry logic. -/// For each method call, if the inner store returns a retryable error, -/// we retry up to `self.max_retries` times with a fixed delay. -macro_rules! retry_method { - ($self:ident, $method:ident ( $($arg:expr),* $(,)? )) => {{ - let mut attempt = 0u32; - loop { - match $self.inner.$method( $($arg),* ).await { - Ok(val) => { - if attempt > 0 { - info!( - method = stringify!($method), - attempt, - "Query succeeded after retry" - ); - metrics::counter!( - "store.retry.succeeded", - "method" => stringify!($method), - ) - .increment(1); - } - return Ok(val); - } - Err(err) if attempt < $self.max_retries && is_retryable_error(&err) => { - warn!( - method = stringify!($method), - attempt, - error = %err, - "Retryable database error, retrying" - ); - metrics::counter!( - "store.retry.attempt", - "method" => stringify!($method), - ) - .increment(1); - sleep($self.retry_delay).await; - attempt += 1; +/// Retries a query-producing closure on transient database errors. +/// +/// The closure `f` is called on each attempt, producing a fresh future. +/// This ensures connection re-acquisition and query re-execution happen +/// naturally on each retry. +pub async fn retry_query( + config: &RetryConfig, + label: &'static str, + f: F, +) -> Result +where + F: Fn() -> Fut, + Fut: Future>, +{ + let mut attempt = 0u32; + loop { + match f().await { + Ok(val) => { + if attempt > 0 { + info!(label, attempt, "Query succeeded after retry"); + metrics::counter!("store.retry.succeeded", "method" => label).increment(1); } - Err(err) => { - if attempt > 0 { - metrics::counter!( - "store.retry.exhausted", - "method" => stringify!($method), - ) - .increment(1); - } - return Err(err); + return Ok(val); + } + Err(err) if attempt < config.max_retries && is_retryable_error(&err) => { + warn!( + label, + attempt, + error = %err, + "Retryable database error, retrying" + ); + metrics::counter!("store.retry.attempt", "method" => label).increment(1); + sleep(config.retry_delay).await; + attempt += 1; + } + Err(err) => { + if attempt > 0 { + metrics::counter!("store.retry.exhausted", "method" => label).increment(1); } + return Err(err); } } - }}; -} - -#[async_trait] -impl InflightActivationStore for RetryStore { - async fn store(&self, batch: Vec) -> Result { - retry_method!(self, store(batch.clone())) - } - - fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { - self.inner.assign_partitions(partitions) - } - - async fn claim_activations( - &self, - application: Option<&str>, - namespaces: Option<&[String]>, - limit: Option, - bucket: Option, - mark_processing: bool, - ) -> Result, Error> { - retry_method!( - self, - claim_activations(application, namespaces, limit, bucket, mark_processing) - ) - } - - async fn mark_activation_processing(&self, id: &str) -> Result<(), Error> { - retry_method!(self, mark_activation_processing(id)) - } - - async fn set_status( - &self, - id: &str, - status: InflightActivationStatus, - ) -> Result, Error> { - retry_method!(self, set_status(id, status)) - } - - async fn pending_activation_max_lag(&self, now: &DateTime) -> f64 { - self.inner.pending_activation_max_lag(now).await - } - - async fn count_by_status(&self, status: InflightActivationStatus) -> Result { - retry_method!(self, count_by_status(status)) - } - - async fn count(&self) -> Result { - retry_method!(self, count()) - } - - async fn get_by_id(&self, id: &str) -> Result, Error> { - retry_method!(self, get_by_id(id)) - } - - async fn count_depths(&self) -> Result { - retry_method!(self, count_depths()) - } - - async fn set_processing_deadline( - &self, - id: &str, - deadline: Option>, - ) -> Result<(), Error> { - retry_method!(self, set_processing_deadline(id, deadline)) - } - - async fn delete_activation(&self, id: &str) -> Result<(), Error> { - retry_method!(self, delete_activation(id)) - } - - async fn vacuum_db(&self) -> Result<(), Error> { - retry_method!(self, vacuum_db()) - } - - async fn full_vacuum_db(&self) -> Result<(), Error> { - retry_method!(self, full_vacuum_db()) - } - - async fn db_size(&self) -> Result { - retry_method!(self, db_size()) - } - - async fn get_retry_activations(&self) -> Result, Error> { - retry_method!(self, get_retry_activations()) - } - - async fn handle_claim_expiration(&self) -> Result { - retry_method!(self, handle_claim_expiration()) - } - - async fn handle_processing_deadline(&self) -> Result { - retry_method!(self, handle_processing_deadline()) - } - - async fn handle_processing_attempts(&self) -> Result { - retry_method!(self, handle_processing_attempts()) - } - - async fn handle_expires_at(&self) -> Result { - retry_method!(self, handle_expires_at()) - } - - async fn handle_delay_until(&self) -> Result { - retry_method!(self, handle_delay_until()) - } - - async fn handle_failed_tasks(&self) -> Result { - retry_method!(self, handle_failed_tasks()) - } - - async fn mark_completed(&self, ids: Vec) -> Result { - retry_method!(self, mark_completed(ids.clone())) - } - - async fn remove_completed(&self) -> Result { - retry_method!(self, remove_completed()) - } - - async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { - retry_method!(self, remove_killswitched(killswitched_tasks.clone())) - } - - async fn clear(&self) -> Result<(), Error> { - retry_method!(self, clear()) - } - - async fn remove_db(&self) -> Result<(), Error> { - self.inner.remove_db().await } } #[cfg(test)] mod tests { use super::*; - use rstest::rstest; + use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; - /// Helper to create a retryable error (sqlx transient error wrapped in anyhow). fn retryable_error() -> Error { Error::from(sqlx::Error::PoolTimedOut) } - /// Helper to create a non-retryable error (sqlx database/logic error wrapped in anyhow). fn non_retryable_error() -> Error { Error::from(sqlx::Error::RowNotFound) } - /// A mock store that fails a set number of times before succeeding. - /// The `retryable` flag controls whether errors are transient (retryable) - /// or permanent (non-retryable). - struct MockFailingStore { - fail_count: AtomicU32, - retryable: bool, - } - - impl MockFailingStore { - fn new(fail_count: u32, retryable: bool) -> Self { - Self { - fail_count: AtomicU32::new(fail_count), - retryable, - } - } - - fn make_error(&self) -> Error { - if self.retryable { - retryable_error() - } else { - non_retryable_error() - } - } - - fn should_fail(&self) -> bool { - let remaining = self.fail_count.load(Ordering::SeqCst); - if remaining > 0 { - self.fail_count.fetch_sub(1, Ordering::SeqCst); - true - } else { - false - } + fn test_config(max_retries: u32) -> RetryConfig { + RetryConfig { + max_retries, + retry_delay: Duration::from_millis(0), } } - #[async_trait] - impl InflightActivationStore for MockFailingStore { - async fn store(&self, _batch: Vec) -> Result { - if self.should_fail() { - Err(self.make_error()) - } else { - Ok(1) - } - } - - fn assign_partitions(&self, _partitions: Vec) -> Result<(), Error> { - Ok(()) - } - - async fn claim_activations( - &self, - _application: Option<&str>, - _namespaces: Option<&[String]>, - _limit: Option, - _bucket: Option, - _mark_processing: bool, - ) -> Result, Error> { - if self.should_fail() { - Err(self.make_error()) - } else { - Ok(vec![]) - } - } - - async fn mark_activation_processing(&self, _id: &str) -> Result<(), Error> { - if self.should_fail() { - Err(self.make_error()) - } else { - Ok(()) - } - } + #[tokio::test] + async fn test_retry_succeeds_after_retryable_errors() { + let fail_count = AtomicU32::new(2); + let config = test_config(3); - async fn set_status( - &self, - _id: &str, - _status: InflightActivationStatus, - ) -> Result, Error> { - if self.should_fail() { - Err(self.make_error()) + let result = retry_query(&config, "test", || async { + let remaining = fail_count.load(Ordering::SeqCst); + if remaining > 0 { + fail_count.fetch_sub(1, Ordering::SeqCst); + Err(retryable_error()) } else { - Ok(None) + Ok(42u64) } - } - - async fn pending_activation_max_lag(&self, _now: &DateTime) -> f64 { - 0.0 - } - - async fn count_by_status(&self, _status: InflightActivationStatus) -> Result { - Ok(0) - } - - async fn count(&self) -> Result { - Ok(0) - } - - async fn get_by_id(&self, _id: &str) -> Result, Error> { - Ok(None) - } - - async fn set_processing_deadline( - &self, - _id: &str, - _deadline: Option>, - ) -> Result<(), Error> { - Ok(()) - } - - async fn delete_activation(&self, _id: &str) -> Result<(), Error> { - Ok(()) - } - - async fn vacuum_db(&self) -> Result<(), Error> { - Ok(()) - } - - async fn full_vacuum_db(&self) -> Result<(), Error> { - Ok(()) - } - - async fn db_size(&self) -> Result { - Ok(0) - } - - async fn get_retry_activations(&self) -> Result, Error> { - Ok(vec![]) - } - - async fn handle_claim_expiration(&self) -> Result { - Ok(0) - } - - async fn handle_processing_deadline(&self) -> Result { - Ok(0) - } - - async fn handle_processing_attempts(&self) -> Result { - Ok(0) - } - - async fn handle_expires_at(&self) -> Result { - Ok(0) - } - - async fn handle_delay_until(&self) -> Result { - Ok(0) - } + }) + .await; - async fn handle_failed_tasks(&self) -> Result { - Ok(FailedTasksForwarder { - to_discard: vec![], - to_deadletter: vec![], - }) - } - - async fn mark_completed(&self, _ids: Vec) -> Result { - Ok(0) - } - - async fn remove_completed(&self) -> Result { - Ok(0) - } - - async fn remove_killswitched( - &self, - _killswitched_tasks: Vec, - ) -> Result { - Ok(0) - } - - async fn clear(&self) -> Result<(), Error> { - Ok(()) - } - } - - #[tokio::test] - async fn test_retry_succeeds_after_retryable_errors() { - let mock = Arc::new(MockFailingStore::new(2, true)); - let store = RetryStore::new(mock, 3, 0); - - let result = store.store(vec![]).await; assert!(result.is_ok()); - assert_eq!(result.unwrap(), 1); + assert_eq!(result.unwrap(), 42); } #[tokio::test] async fn test_retry_exhausted_surfaces_error() { - let mock = Arc::new(MockFailingStore::new(5, true)); - let store = RetryStore::new(mock, 3, 0); + let fail_count = AtomicU32::new(5); + let config = test_config(3); - let result = store.store(vec![]).await; - assert!(result.is_err()); - } - - #[tokio::test] - async fn test_non_retryable_error_not_retried() { - let mock = Arc::new(MockFailingStore::new(1, false)); - let store = RetryStore::new(mock.clone(), 3, 0); + let result = retry_query(&config, "test", || async { + let remaining = fail_count.load(Ordering::SeqCst); + if remaining > 0 { + fail_count.fetch_sub(1, Ordering::SeqCst); + Err(retryable_error()) + } else { + Ok(42u64) + } + }) + .await; - let result = store.store(vec![]).await; assert!(result.is_err()); - // The fail count was decremented once (the initial attempt), no retries - assert_eq!(mock.fail_count.load(Ordering::SeqCst), 0); - } - - #[tokio::test] - async fn test_set_status_retries_on_retryable_error() { - let mock = Arc::new(MockFailingStore::new(1, true)); - let store = RetryStore::new(mock, 3, 0); - - let result = store - .set_status("test-id", InflightActivationStatus::Complete) - .await; - assert!(result.is_ok()); } #[tokio::test] - async fn test_claim_activations_retries_on_retryable_error() { - let mock = Arc::new(MockFailingStore::new(2, true)); - let store = RetryStore::new(mock, 3, 0); - - let result = store - .claim_activations(None, None, Some(1), None, true) - .await; - assert!(result.is_ok()); - } + async fn test_non_retryable_error_not_retried() { + let call_count = AtomicU32::new(0); + let config = test_config(3); - #[tokio::test] - async fn test_mark_activation_processing_retries_on_retryable_error() { - let mock = Arc::new(MockFailingStore::new(2, true)); - let store = RetryStore::new(mock, 3, 0); + let result = retry_query(&config, "test", || async { + call_count.fetch_add(1, Ordering::SeqCst); + Err::(non_retryable_error()) + }) + .await; - let result = store.mark_activation_processing("test-id").await; - assert!(result.is_ok()); + assert!(result.is_err()); + // Called only once — no retries for non-retryable errors + assert_eq!(call_count.load(Ordering::SeqCst), 1); } #[tokio::test] async fn test_zero_retries_surfaces_immediately() { - let mock = Arc::new(MockFailingStore::new(1, true)); - let store = RetryStore::new(mock, 0, 0); + let config = test_config(0); + + let result = retry_query(&config, "test", || async { + Err::(retryable_error()) + }) + .await; - let result = store.store(vec![]).await; assert!(result.is_err()); } - /// Mirrors the main.rs wiring: when max_retries is None, RetryStore is not - /// used and the inner store is called directly — retryable errors surface - /// immediately. When Some, RetryStore wraps the inner store and retries. - #[rstest] - #[case::none_bypasses_retry(None, true)] - #[case::some_enables_retry(Some(3), false)] #[tokio::test] - async fn test_config_retry_wiring(#[case] max_retries: Option, #[case] expect_err: bool) { - // Mock fails once with a retryable error then succeeds - let inner: Arc = Arc::new(MockFailingStore::new(1, true)); + async fn test_success_on_first_attempt_no_retry() { + let config = test_config(3); - // Simulate main.rs wiring - let store: Arc = match max_retries { - Some(n) => Arc::new(RetryStore::new(inner, n, 0)), - None => inner, - }; + let result = retry_query(&config, "test", || async { Ok(1u64) }).await; - let result = store.store(vec![]).await; - assert_eq!(result.is_err(), expect_err); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 1); } #[test] @@ -529,4 +197,19 @@ mod tests { "unexpected".into() )))); } + + #[test] + fn test_config_defaults_to_zero_retries() { + let config = RetryConfig::from_config(&Arc::new(Config::default())); + assert_eq!(config.max_retries, 0); + } + + #[test] + fn test_config_uses_configured_retries() { + let config = RetryConfig::from_config(&Arc::new(Config { + db_query_max_retries: Some(5), + ..Config::default() + })); + assert_eq!(config.max_retries, 5); + } } diff --git a/src/store/tests.rs b/src/store/tests.rs index 18df83aa..452dddb1 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -1928,6 +1928,10 @@ async fn test_db_status_calls_ok() { claim_lease_ms: 5000, vacuum_page_count: None, enable_sqlite_status_metrics: false, + retry_config: crate::store::retry::RetryConfig { + max_retries: 0, + retry_delay: std::time::Duration::from_millis(0), + }, }, ) .await From 3bcda754a608161fd62f9f1ae6ffb901a7c391d2 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 15 May 2026 18:39:41 -0400 Subject: [PATCH 09/12] remove some sites --- src/store/adapters/postgres.rs | 14 ++---- src/store/adapters/sqlite.rs | 91 ++++++++++++++++------------------ 2 files changed, 46 insertions(+), 59 deletions(-) diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 4af0256a..1c365ae9 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -265,21 +265,15 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn vacuum_db(&self) -> Result<(), Error> { - retry_query(&self.config.retry_config, "vacuum_db", || async { - // TODO: Remove - Ok(()) - }) - .await + // TODO: Remove + Ok(()) } /// Perform a full vacuum on the database. #[framed] async fn full_vacuum_db(&self) -> Result<(), Error> { - retry_query(&self.config.retry_config, "full_vacuum_db", || async { - // TODO: Remove - Ok(()) - }) - .await + // TODO: Remove + Ok(()) } /// Get the size of the database in bytes based on SQLite metadata queries. diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index aeb24abf..17e0f20e 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -346,40 +346,31 @@ impl InflightActivationStore for SqliteActivationStore { async fn vacuum_db(&self) -> Result<(), Error> { let timer = Instant::now(); - retry_query(&self.config.retry_config, "vacuum_db", || async { - if let Some(page_count) = self.config.vacuum_page_count { - let mut conn = self.acquire_write_conn_metric("vacuum_db").await?; - sqlx::query(format!("PRAGMA incremental_vacuum({page_count})").as_str()) - .execute(&mut *conn) - .await?; - } else { - let mut conn = self.acquire_write_conn_metric("vacuum_db").await?; - sqlx::query("PRAGMA incremental_vacuum") - .execute(&mut *conn) - .await?; - } - let freelist_count: i32 = sqlx::query("PRAGMA freelist_count") - .fetch_one(&self.read_pool) - .await? - .get("freelist_count"); - - metrics::gauge!("store.vacuum.freelist", "database" => "meta").set(freelist_count); - Ok(()) - }) - .await?; + if let Some(page_count) = self.config.vacuum_page_count { + let mut conn = self.acquire_write_conn_metric("vacuum_db").await?; + sqlx::query(format!("PRAGMA incremental_vacuum({page_count})").as_str()) + .execute(&mut *conn) + .await?; + } else { + let mut conn = self.acquire_write_conn_metric("vacuum_db").await?; + sqlx::query("PRAGMA incremental_vacuum") + .execute(&mut *conn) + .await?; + } + let freelist_count: i32 = sqlx::query("PRAGMA freelist_count") + .fetch_one(&self.read_pool) + .await? + .get("freelist_count"); metrics::histogram!("store.vacuum", "database" => "meta").record(timer.elapsed()); + metrics::gauge!("store.vacuum.freelist", "database" => "meta").set(freelist_count); Ok(()) } /// Perform a full vacuum on the database. async fn full_vacuum_db(&self) -> Result<(), Error> { - retry_query(&self.config.retry_config, "full_vacuum_db", || async { - let mut conn = self.acquire_write_conn_metric("full_vacuum_db").await?; - sqlx::query("VACUUM").execute(&mut *conn).await?; - Ok(()) - }) - .await?; + let mut conn = self.acquire_write_conn_metric("full_vacuum_db").await?; + sqlx::query("VACUUM").execute(&mut *conn).await?; self.emit_db_status_metrics().await; Ok(()) } @@ -455,7 +446,7 @@ impl InflightActivationStore for SqliteActivationStore { .map(TableRow::try_from) .collect::, _>>()?; - retry_query(&self.config.retry_config, "store", || async { + let rows_affected = retry_query(&self.config.retry_config, "store", || async { let mut query_builder = QueryBuilder::::new( " INSERT INTO inflight_taskactivations @@ -519,30 +510,32 @@ impl InflightActivationStore for SqliteActivationStore { .build(); let mut conn = self.acquire_write_conn_metric("store").await?; let result = query.execute(&mut *conn).await?; + Ok(result.rows_affected()) + }) + .await?; - // Sync the WAL into the main database so we don't lose data on host failure. - // Best-effort — errors are swallowed and only recorded as metrics. - let checkpoint_timer = Instant::now(); - let checkpoint_result = sqlx::query("PRAGMA wal_checkpoint(PASSIVE)") - .fetch_one(&mut *conn) - .await; - match checkpoint_result { - Ok(row) => { - metrics::gauge!("store.passive_checkpoint_busy").set(row.get::("busy")); - metrics::gauge!("store.pages_written_to_wal").set(row.get::("log")); - metrics::gauge!("store.pages_committed_to_db") - .set(row.get::("checkpointed")); - metrics::gauge!("store.checkpoint.failed").set(0); - } - Err(_e) => { - metrics::gauge!("store.checkpoint.failed").set(1); - } + // Sync the WAL into the main database so we don't lose data on host failure. + // Best-effort — errors are swallowed and only recorded as metrics. + let checkpoint_timer = Instant::now(); + let mut conn = self.acquire_write_conn_metric("store_checkpoint").await?; + let checkpoint_result = sqlx::query("PRAGMA wal_checkpoint(PASSIVE)") + .fetch_one(&mut *conn) + .await; + match checkpoint_result { + Ok(row) => { + metrics::gauge!("store.passive_checkpoint_busy").set(row.get::("busy")); + metrics::gauge!("store.pages_written_to_wal").set(row.get::("log")); + metrics::gauge!("store.pages_committed_to_db") + .set(row.get::("checkpointed")); + metrics::gauge!("store.checkpoint.failed").set(0); + } + Err(_e) => { + metrics::gauge!("store.checkpoint.failed").set(1); } - metrics::histogram!("store.checkpoint.duration").record(checkpoint_timer.elapsed()); + } + metrics::histogram!("store.checkpoint.duration").record(checkpoint_timer.elapsed()); - Ok(result.rows_affected()) - }) - .await + Ok(rows_affected) } #[instrument(skip_all)] From 26cd1575e8fbceb103f39bd64e9a9a394bb5ce52 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 15 May 2026 19:01:45 -0400 Subject: [PATCH 10/12] change logging to debug level --- src/store/retry.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/store/retry.rs b/src/store/retry.rs index 69410c19..03c994ed 100644 --- a/src/store/retry.rs +++ b/src/store/retry.rs @@ -3,7 +3,7 @@ use std::time::Duration; use anyhow::Error; use tokio::time::sleep; -use tracing::{info, warn}; +use tracing::{debug, warn}; use crate::config::Config; @@ -54,7 +54,7 @@ where match f().await { Ok(val) => { if attempt > 0 { - info!(label, attempt, "Query succeeded after retry"); + debug!(label, attempt, "Query succeeded after retry"); metrics::counter!("store.retry.succeeded", "method" => label).increment(1); } return Ok(val); From 3adacade0dfc9359ae447fe1752124578a652f9d Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 15 May 2026 19:14:47 -0400 Subject: [PATCH 11/12] i think just formatting --- src/store/adapters/sqlite.rs | 46 ++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 17e0f20e..553f9f26 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -449,29 +449,29 @@ impl InflightActivationStore for SqliteActivationStore { let rows_affected = retry_query(&self.config.retry_config, "store", || async { let mut query_builder = QueryBuilder::::new( " - INSERT INTO inflight_taskactivations - ( - id, - activation, - partition, - offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - claim_expires_at, - status, - at_most_once, - application, - namespace, - taskname, - on_attempts_exceeded, - bucket - ) - ", + INSERT INTO inflight_taskactivations + ( + id, + activation, + partition, + offset, + added_at, + received_at, + processing_attempts, + expires_at, + delay_until, + processing_deadline_duration, + processing_deadline, + claim_expires_at, + status, + at_most_once, + application, + namespace, + taskname, + on_attempts_exceeded, + bucket + ) + ", ); let query = query_builder .push_values(rows.clone(), |mut b, row: TableRow| { From 8867cac05cb4c7f6a5563ffba4f355e22235c54f Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 15 May 2026 19:36:13 -0400 Subject: [PATCH 12/12] unwrap more call sites --- src/store/adapters/postgres.rs | 85 ++++++++++++++++------------------ src/store/adapters/sqlite.rs | 71 +++++++++++++--------------- 2 files changed, 71 insertions(+), 85 deletions(-) diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 1c365ae9..d9c8ac0a 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -832,61 +832,54 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn handle_processing_deadline(&self) -> Result { - retry_query( - &self.config.retry_config, - "handle_processing_deadline", - || async { - let now = Utc::now(); - let mut atomic = self.write_pool.begin().await?; + let now = Utc::now(); + let mut atomic = self.write_pool.begin().await?; - // At-most-once tasks that fail their processing deadlines go directly to failure - // there are no retries, as the worker will reject the task due to at_most_once keys. - let mut query_builder = QueryBuilder::new( - "UPDATE inflight_taskactivations - SET processing_deadline = null, status = ", - ); - query_builder.push_bind(InflightActivationStatus::Failure.to_string()); - query_builder.push(" WHERE processing_deadline < "); - query_builder.push_bind(now); - query_builder.push(" AND at_most_once = TRUE AND status = "); - query_builder.push_bind(InflightActivationStatus::Processing.to_string()); + // At-most-once tasks that fail their processing deadlines go directly to failure + // there are no retries, as the worker will reject the task due to at_most_once keys. + let mut query_builder = QueryBuilder::new( + "UPDATE inflight_taskactivations + SET processing_deadline = null, status = ", + ); + query_builder.push_bind(InflightActivationStatus::Failure.to_string()); + query_builder.push(" WHERE processing_deadline < "); + query_builder.push_bind(now); + query_builder.push(" AND at_most_once = TRUE AND status = "); + query_builder.push_bind(InflightActivationStatus::Processing.to_string()); - self.add_partition_condition(&mut query_builder, false); + self.add_partition_condition(&mut query_builder, false); - let most_once_result = query_builder.build().execute(&mut *atomic).await; + let most_once_result = query_builder.build().execute(&mut *atomic).await; - let mut processing_deadline_modified_rows = 0; - if let Ok(query_res) = most_once_result { - processing_deadline_modified_rows = query_res.rows_affected(); - } + let mut processing_deadline_modified_rows = 0; + if let Ok(query_res) = most_once_result { + processing_deadline_modified_rows = query_res.rows_affected(); + } - // Update regular tasks. - // Increment processing_attempts by 1 and reset processing_deadline to null. - let mut query_builder = QueryBuilder::new( - "UPDATE inflight_taskactivations - SET processing_deadline = null, status = ", - ); - query_builder.push_bind(InflightActivationStatus::Pending.to_string()); - query_builder.push(", processing_attempts = processing_attempts + 1"); - query_builder.push(" WHERE processing_deadline < "); - query_builder.push_bind(now); - query_builder.push(" AND status = "); - query_builder.push_bind(InflightActivationStatus::Processing.to_string()); - self.add_partition_condition(&mut query_builder, false); + // Update regular tasks. + // Increment processing_attempts by 1 and reset processing_deadline to null. + let mut query_builder = QueryBuilder::new( + "UPDATE inflight_taskactivations + SET processing_deadline = null, status = ", + ); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push(", processing_attempts = processing_attempts + 1"); + query_builder.push(" WHERE processing_deadline < "); + query_builder.push_bind(now); + query_builder.push(" AND status = "); + query_builder.push_bind(InflightActivationStatus::Processing.to_string()); + self.add_partition_condition(&mut query_builder, false); - let result = query_builder.build().execute(&mut *atomic).await; + let result = query_builder.build().execute(&mut *atomic).await; - atomic.commit().await?; + atomic.commit().await?; - if let Ok(query_res) = result { - processing_deadline_modified_rows += query_res.rows_affected(); - return Ok(processing_deadline_modified_rows); - } + if let Ok(query_res) = result { + processing_deadline_modified_rows += query_res.rows_affected(); + return Ok(processing_deadline_modified_rows); + } - Err(anyhow!("Could not update tasks past processing_deadline")) - }, - ) - .await + Err(anyhow!("Could not update tasks past processing_deadline")) } /// Update tasks that have exceeded their max processing attempts. diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 553f9f26..fdd55b5b 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -869,51 +869,44 @@ impl InflightActivationStore for SqliteActivationStore { /// if a worker took the task and was killed, or failed. #[instrument(skip_all)] async fn handle_processing_deadline(&self) -> Result { - retry_query( - &self.config.retry_config, - "handle_processing_deadline", - || async { - let now = Utc::now(); - let mut atomic = self.write_pool.begin().await?; + let now = Utc::now(); + let mut atomic = self.write_pool.begin().await?; - let most_once_result = sqlx::query( - "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1 - WHERE processing_deadline < $2 AND at_most_once = TRUE AND status = $3", - ) - .bind(InflightActivationStatus::Failure) - .bind(now.timestamp()) - .bind(InflightActivationStatus::Processing) - .execute(&mut *atomic) - .await; + let most_once_result = sqlx::query( + "UPDATE inflight_taskactivations + SET processing_deadline = null, status = $1 + WHERE processing_deadline < $2 AND at_most_once = TRUE AND status = $3", + ) + .bind(InflightActivationStatus::Failure) + .bind(now.timestamp()) + .bind(InflightActivationStatus::Processing) + .execute(&mut *atomic) + .await; - let mut processing_deadline_modified_rows = 0; - if let Ok(query_res) = most_once_result { - processing_deadline_modified_rows = query_res.rows_affected(); - } + let mut processing_deadline_modified_rows = 0; + if let Ok(query_res) = most_once_result { + processing_deadline_modified_rows = query_res.rows_affected(); + } - let result = sqlx::query( - "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1 - WHERE processing_deadline < $2 AND status = $3", - ) - .bind(InflightActivationStatus::Pending) - .bind(now.timestamp()) - .bind(InflightActivationStatus::Processing) - .execute(&mut *atomic) - .await; + let result = sqlx::query( + "UPDATE inflight_taskactivations + SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1 + WHERE processing_deadline < $2 AND status = $3", + ) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()) + .bind(InflightActivationStatus::Processing) + .execute(&mut *atomic) + .await; - atomic.commit().await?; + atomic.commit().await?; - if let Ok(query_res) = result { - processing_deadline_modified_rows += query_res.rows_affected(); - return Ok(processing_deadline_modified_rows); - } + if let Ok(query_res) = result { + processing_deadline_modified_rows += query_res.rows_affected(); + return Ok(processing_deadline_modified_rows); + } - Err(anyhow!("Could not update tasks past processing_deadline")) - }, - ) - .await + Err(anyhow!("Could not update tasks past processing_deadline")) } /// Update tasks that have exceeded their max processing attempts.