diff --git a/benches/store_bench.rs b/benches/store_bench.rs index 3c72d7e8..69fa7a7b 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -32,6 +32,10 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) { processing_deadline_grace_sec: 3, claim_lease_ms: 5000, enable_sqlite_status_metrics: false, + retry_config: taskbroker::store::retry::RetryConfig { + max_retries: 0, + retry_delay: std::time::Duration::from_millis(0), + }, }, ) .await @@ -97,6 +101,10 @@ async fn set_status(num_activations: u32, num_workers: u32) { processing_deadline_grace_sec: 3, claim_lease_ms: 5000, enable_sqlite_status_metrics: false, + retry_config: taskbroker::store::retry::RetryConfig { + max_retries: 0, + retry_delay: std::time::Duration::from_millis(0), + }, }, ) .await diff --git a/src/config.rs b/src/config.rs index 700f1a5a..3dd5d290 100644 --- a/src/config.rs +++ b/src/config.rs @@ -180,6 +180,13 @@ pub struct Config { /// The amount of time to wait before retrying writes to db when write fails. pub db_write_failure_backoff_ms: u64, + /// The maximum number of times to retry a transient database query error + /// before surfacing the error. When None, queries are not retried. + pub db_query_max_retries: Option, + + /// The delay in milliseconds between query retry attempts. + pub db_query_retry_delay_ms: u64, + /// The maximum number of tasks that are buffered /// before being written to InflightTaskStore (sqlite). pub db_insert_batch_max_len: usize, @@ -378,6 +385,8 @@ impl Default for Config { pg_default_database_name: "postgres".to_owned(), pg_extra_query_params: None, db_write_failure_backoff_ms: 4000, + db_query_max_retries: None, + db_query_retry_delay_ms: 100, db_insert_batch_max_len: 256, db_insert_batch_max_size: 16_000_000, db_insert_batch_max_time_ms: 1000, diff --git a/src/store.rs b/src/store.rs index bdc6e8df..c2d65951 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,5 +1,6 @@ pub mod activation; pub mod adapters; +pub mod retry; pub mod traits; pub mod types; diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 8f087b59..d9c8ac0a 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -16,10 +16,11 @@ use tracing::{instrument, warn}; use crate::config::Config; use crate::store::activation::{InflightActivation, InflightActivationStatus}; +use crate::store::retry::{RetryConfig, retry_query}; use crate::store::traits::InflightActivationStore; use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; -#[derive(Debug, FromRow)] +#[derive(Debug, Clone, FromRow)] struct TableRow { pub id: String, pub activation: Vec, @@ -139,6 +140,7 @@ pub struct PostgresActivationStoreConfig { pub claim_lease_ms: u64, pub vacuum_page_count: Option, pub enable_sqlite_status_metrics: bool, + pub retry_config: RetryConfig, } impl PostgresActivationStoreConfig { @@ -164,6 +166,7 @@ impl PostgresActivationStoreConfig { processing_deadline_grace_sec: config.processing_deadline_grace_sec, claim_lease_ms: config.fetch_batch_size.max(1) as u64 * config.push_queue_timeout_ms, enable_sqlite_status_metrics: config.enable_sqlite_status_metrics, + retry_config: RetryConfig::from_config(config), } } } @@ -276,53 +279,59 @@ impl InflightActivationStore for PostgresActivationStore { /// Get the size of the database in bytes based on SQLite metadata queries. #[framed] async fn db_size(&self) -> Result { - let row_result: (i64,) = sqlx::query_as("SELECT pg_database_size($1) as size") - .bind(&self.config.pg_database_name) - .fetch_one(&self.read_pool) - .await?; - if row_result.0 < 0 { - return Ok(0); - } - Ok(row_result.0 as u64) + retry_query(&self.config.retry_config, "db_size", || async { + let row_result: (i64,) = sqlx::query_as("SELECT pg_database_size($1) as size") + .bind(&self.config.pg_database_name) + .fetch_one(&self.read_pool) + .await?; + if row_result.0 < 0 { + return Ok(0); + } + Ok(row_result.0 as u64) + }) + .await } /// Get an activation by id. Primarily used for testing #[framed] async fn get_by_id(&self, id: &str) -> Result, Error> { - let row_result: Option = sqlx::query_as( - " - SELECT id, - activation, - partition, - kafka_offset AS offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - claim_expires_at, - status, - at_most_once, - application, - namespace, - taskname, - on_attempts_exceeded, - bucket - FROM inflight_taskactivations - WHERE id = $1 - ", - ) - .bind(id) - .fetch_optional(&self.read_pool) - .await?; + retry_query(&self.config.retry_config, "get_by_id", || async { + let row_result: Option = sqlx::query_as( + " + SELECT id, + activation, + partition, + kafka_offset AS offset, + added_at, + received_at, + processing_attempts, + expires_at, + delay_until, + processing_deadline_duration, + processing_deadline, + claim_expires_at, + status, + at_most_once, + application, + namespace, + taskname, + on_attempts_exceeded, + bucket + FROM inflight_taskactivations + WHERE id = $1 + ", + ) + .bind(id) + .fetch_optional(&self.read_pool) + .await?; - let Some(row) = row_result else { - return Ok(None); - }; + let Some(row) = row_result else { + return Ok(None); + }; - Ok(Some(row.into())) + Ok(Some(row.into())) + }) + .await } fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { @@ -339,74 +348,78 @@ impl InflightActivationStore for PostgresActivationStore { return Ok(0); } - let mut query_builder = QueryBuilder::::new( - " - INSERT INTO inflight_taskactivations - ( - id, - activation, - partition, - kafka_offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - claim_expires_at, - status, - at_most_once, - application, - namespace, - taskname, - on_attempts_exceeded, - bucket - ) - ", - ); let rows = batch .into_iter() .map(TableRow::try_from) .collect::, _>>()?; - let query = query_builder - .push_values(rows, |mut b, row| { - b.push_bind(row.id); - b.push_bind(row.activation); - b.push_bind(row.partition); - b.push_bind(row.offset); - b.push_bind(row.added_at); - b.push_bind(row.received_at); - b.push_bind(row.processing_attempts); - b.push_bind(row.expires_at); - b.push_bind(row.delay_until); - b.push_bind(row.processing_deadline_duration); - if let Some(deadline) = row.processing_deadline { - b.push_bind(deadline); - } else { - // Add a literal null - b.push("null"); - } - if let Some(exp) = row.claim_expires_at { - b.push_bind(exp); - } else { - b.push("null"); - } - b.push_bind(row.status); - b.push_bind(row.at_most_once); - b.push_bind(row.application); - b.push_bind(row.namespace); - b.push_bind(row.taskname); - b.push_bind(row.on_attempts_exceeded as i32); - b.push_bind(row.bucket); - }) - .push(" ON CONFLICT(id) DO NOTHING") - .build(); - let mut conn = self.acquire_write_conn_metric("store").await?; - let result = query.execute(&mut *conn).await?; - - Ok(result.rows_affected()) + retry_query(&self.config.retry_config, "store", || async { + let mut query_builder = QueryBuilder::::new( + " + INSERT INTO inflight_taskactivations + ( + id, + activation, + partition, + kafka_offset, + added_at, + received_at, + processing_attempts, + expires_at, + delay_until, + processing_deadline_duration, + processing_deadline, + claim_expires_at, + status, + at_most_once, + application, + namespace, + taskname, + on_attempts_exceeded, + bucket + ) + ", + ); + let query = query_builder + .push_values(rows.clone(), |mut b, row: TableRow| { + b.push_bind(row.id); + b.push_bind(row.activation); + b.push_bind(row.partition); + b.push_bind(row.offset); + b.push_bind(row.added_at); + b.push_bind(row.received_at); + b.push_bind(row.processing_attempts); + b.push_bind(row.expires_at); + b.push_bind(row.delay_until); + b.push_bind(row.processing_deadline_duration); + if let Some(deadline) = row.processing_deadline { + b.push_bind(deadline); + } else { + // Add a literal null + b.push("null"); + } + if let Some(exp) = row.claim_expires_at { + b.push_bind(exp); + } else { + b.push("null"); + } + b.push_bind(row.status); + b.push_bind(row.at_most_once); + b.push_bind(row.application); + b.push_bind(row.namespace); + b.push_bind(row.taskname); + b.push_bind(row.on_attempts_exceeded as i32); + b.push_bind(row.bucket); + }) + .push(" ON CONFLICT(id) DO NOTHING") + .build(); + + let mut conn = self.acquire_write_conn_metric("store").await?; + let result = query.execute(&mut *conn).await?; + + Ok(result.rows_affected()) + }) + .await } #[instrument(skip_all)] @@ -419,122 +432,134 @@ impl InflightActivationStore for PostgresActivationStore { bucket: Option, mark_processing: bool, ) -> Result, Error> { - let now = Utc::now(); - let grace_period = self.config.processing_deadline_grace_sec; let claim_lease_ms = self.config.claim_lease_ms as i64; - let mut query_builder = QueryBuilder::::new( - "WITH selected_activations AS ( - SELECT id - FROM inflight_taskactivations - WHERE status = ", - ); - query_builder.push_bind(InflightActivationStatus::Pending.to_string()); - query_builder.push(" AND (expires_at IS NULL OR expires_at > "); - query_builder.push_bind(now); - query_builder.push(")"); + retry_query(&self.config.retry_config, "claim_activations", || async { + let now = Utc::now(); - self.add_partition_condition(&mut query_builder, false); - - // Handle application & namespace filtering - if let Some(value) = application { - query_builder.push(" AND application ="); - query_builder.push_bind(value); - } - if let Some(namespaces) = namespaces - && !namespaces.is_empty() - { - query_builder.push(" AND namespace IN ("); - let mut separated = query_builder.separated(", "); - for namespace in namespaces.iter() { - separated.push_bind(namespace); - } + let mut query_builder = QueryBuilder::::new( + "WITH selected_activations AS ( + SELECT id + FROM inflight_taskactivations + WHERE status = ", + ); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push(" AND (expires_at IS NULL OR expires_at > "); + query_builder.push_bind(now); query_builder.push(")"); - } - - if let Some((min, max)) = bucket { - query_builder.push(" AND bucket >= "); - query_builder.push_bind(min); - query_builder.push(" AND bucket <= "); - query_builder.push_bind(max); - } + self.add_partition_condition(&mut query_builder, false); - query_builder.push(" ORDER BY added_at"); - if let Some(limit) = limit { - query_builder.push(" LIMIT "); - query_builder.push_bind(limit); - } - query_builder.push(" FOR UPDATE SKIP LOCKED)"); + // Handle application & namespace filtering + if let Some(value) = application { + query_builder.push(" AND application ="); + query_builder.push_bind(value); + } + if let Some(namespaces) = namespaces + && !namespaces.is_empty() + { + query_builder.push(" AND namespace IN ("); + let mut separated = query_builder.separated(", "); + for namespace in namespaces.iter() { + separated.push_bind(namespace); + } + query_builder.push(")"); + } - if mark_processing { - query_builder.push(format!( - "UPDATE inflight_taskactivations - SET processing_deadline = now() + (processing_deadline_duration * interval '1 second') + (interval '{grace_period} seconds'), - claim_expires_at = NULL, - status = " - )); + if let Some((min, max)) = bucket { + query_builder.push(" AND bucket >= "); + query_builder.push_bind(min); - query_builder.push_bind(InflightActivationStatus::Processing.to_string()); - } else { - query_builder.push(format!( - "UPDATE inflight_taskactivations - SET claim_expires_at = now() + ({claim_lease_ms} * interval '1 millisecond') + (interval '{grace_period} seconds'), - processing_deadline = NULL, - status = " - )); + query_builder.push(" AND bucket <= "); + query_builder.push_bind(max); + } - query_builder.push_bind(InflightActivationStatus::Claimed.to_string()); - } + query_builder.push(" ORDER BY added_at"); + if let Some(limit) = limit { + query_builder.push(" LIMIT "); + query_builder.push_bind(limit); + } + query_builder.push(" FOR UPDATE SKIP LOCKED)"); + + if mark_processing { + query_builder.push(format!( + "UPDATE inflight_taskactivations + SET processing_deadline = now() + (processing_deadline_duration * interval '1 second') + (interval '{grace_period} seconds'), + claim_expires_at = NULL, + status = " + )); + + query_builder.push_bind(InflightActivationStatus::Processing.to_string()); + } else { + query_builder.push(format!( + "UPDATE inflight_taskactivations + SET claim_expires_at = now() + ({claim_lease_ms} * interval '1 millisecond') + (interval '{grace_period} seconds'), + processing_deadline = NULL, + status = " + )); + + query_builder.push_bind(InflightActivationStatus::Claimed.to_string()); + } - query_builder.push(" FROM selected_activations "); - query_builder.push(" WHERE inflight_taskactivations.id = selected_activations.id"); - query_builder.push(" RETURNING *, kafka_offset AS offset"); + query_builder.push(" FROM selected_activations "); + query_builder.push(" WHERE inflight_taskactivations.id = selected_activations.id"); + query_builder.push(" RETURNING *, kafka_offset AS offset"); - let mut conn = self.acquire_write_conn_metric("claim_activations").await?; - let rows: Vec = query_builder - .build_query_as::() - .fetch_all(&mut *conn) - .await?; + let mut conn = self.acquire_write_conn_metric("claim_activations").await?; + let rows: Vec = query_builder + .build_query_as::() + .fetch_all(&mut *conn) + .await?; - Ok(rows.into_iter().map(|row| row.into()).collect()) + Ok(rows.into_iter().map(|row| row.into()).collect()) + }) + .await } #[instrument(skip_all)] #[framed] async fn mark_activation_processing(&self, id: &str) -> Result<(), Error> { - let mut conn = self - .acquire_write_conn_metric("mark_activation_processing") - .await?; - let grace_period = self.config.processing_deadline_grace_sec; - let result = sqlx::query(&format!( - "UPDATE inflight_taskactivations SET - status = $1, - processing_deadline = now() + (processing_deadline_duration * interval '1 second') + (interval '{grace_period} seconds'), - claim_expires_at = NULL - WHERE id = $2 AND status = $3", - )) - .bind(InflightActivationStatus::Processing.to_string()) - .bind(id) - .bind(InflightActivationStatus::Claimed.to_string()) - .execute(&mut *conn) - .await?; - if result.rows_affected() == 0 { - metrics::counter!("push.mark_activation_processing", "result" => "not_found") - .increment(1); + retry_query( + &self.config.retry_config, + "mark_activation_processing", + || async { + let mut conn = self + .acquire_write_conn_metric("mark_activation_processing") + .await?; - warn!( - task_id = %id, - "Activation could not be marked as processing, it may be missing or its status may have already changed" - ); - } else { - metrics::counter!("push.mark_activation_processing", "result" => "ok").increment(1); - } + let result = sqlx::query(&format!( + "UPDATE inflight_taskactivations SET + status = $1, + processing_deadline = now() + (processing_deadline_duration * interval '1 second') + (interval '{grace_period} seconds'), + claim_expires_at = NULL + WHERE id = $2 AND status = $3", + )) + .bind(InflightActivationStatus::Processing.to_string()) + .bind(id) + .bind(InflightActivationStatus::Claimed.to_string()) + .execute(&mut *conn) + .await?; - Ok(()) + if result.rows_affected() == 0 { + metrics::counter!("push.mark_activation_processing", "result" => "not_found") + .increment(1); + + warn!( + task_id = %id, + "Activation could not be marked as processing, it may be missing or its status may have already changed" + ); + } else { + metrics::counter!("push.mark_activation_processing", "result" => "ok") + .increment(1); + } + + Ok(()) + }, + ) + .await } /// Get the age of the oldest pending activation in seconds. @@ -579,55 +604,64 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn count_by_status(&self, status: InflightActivationStatus) -> Result { - let mut query_builder = QueryBuilder::new( - "SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = ", - ); - query_builder.push_bind(status.to_string()); - self.add_partition_condition(&mut query_builder, false); - let result = query_builder - .build_query_as::<(i64,)>() - .fetch_one(&self.read_pool) - .await?; - Ok(result.0 as usize) + retry_query(&self.config.retry_config, "count_by_status", || async { + let mut query_builder = QueryBuilder::new( + "SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = ", + ); + query_builder.push_bind(status.to_string()); + self.add_partition_condition(&mut query_builder, false); + let result = query_builder + .build_query_as::<(i64,)>() + .fetch_one(&self.read_pool) + .await?; + Ok(result.0 as usize) + }) + .await } #[framed] async fn count(&self) -> Result { - let mut query_builder = - QueryBuilder::new("SELECT COUNT(*) as count FROM inflight_taskactivations"); - self.add_partition_condition(&mut query_builder, true); - let result = query_builder - .build_query_as::<(i64,)>() - .fetch_one(&self.read_pool) - .await?; - Ok(result.0 as usize) + retry_query(&self.config.retry_config, "count", || async { + let mut query_builder = + QueryBuilder::new("SELECT COUNT(*) as count FROM inflight_taskactivations"); + self.add_partition_condition(&mut query_builder, true); + let result = query_builder + .build_query_as::<(i64,)>() + .fetch_one(&self.read_pool) + .await?; + Ok(result.0 as usize) + }) + .await } #[instrument(skip_all)] #[framed] async fn count_depths(&self) -> Result { - // Notice that statuses are embedded into the query for simplicity - if the enum is every changed, this must change too! - let mut query_builder = QueryBuilder::new( - "SELECT COUNT(*) FILTER (WHERE status = 'Pending'), - COUNT(*) FILTER (WHERE status = 'Delay'), - COUNT(*) FILTER (WHERE status = 'Claimed'), - COUNT(*) FILTER (WHERE status = 'Processing') - FROM inflight_taskactivations", - ); + retry_query(&self.config.retry_config, "count_depths", || async { + // Notice that statuses are embedded into the query for simplicity - if the enum is every changed, this must change too! + let mut query_builder = QueryBuilder::new( + "SELECT COUNT(*) FILTER (WHERE status = 'Pending'), + COUNT(*) FILTER (WHERE status = 'Delay'), + COUNT(*) FILTER (WHERE status = 'Claimed'), + COUNT(*) FILTER (WHERE status = 'Processing') + FROM inflight_taskactivations", + ); - self.add_partition_condition(&mut query_builder, true); + self.add_partition_condition(&mut query_builder, true); - let row: (i64, i64, i64, i64) = query_builder - .build_query_as() - .fetch_one(&self.read_pool) - .await?; + let row: (i64, i64, i64, i64) = query_builder + .build_query_as() + .fetch_one(&self.read_pool) + .await?; - Ok(DepthCounts { - pending: row.0 as usize, - delay: row.1 as usize, - claimed: row.2 as usize, - processing: row.3 as usize, + Ok(DepthCounts { + pending: row.0 as usize, + delay: row.1 as usize, + claimed: row.2 as usize, + processing: row.3 as usize, + }) }) + .await } /// Update the status of a specific activation @@ -638,20 +672,23 @@ impl InflightActivationStore for PostgresActivationStore { id: &str, status: InflightActivationStatus, ) -> Result, Error> { - let mut conn = self.acquire_write_conn_metric("set_status").await?; - let result: Option = sqlx::query_as( - "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *, kafka_offset AS offset", - ) - .bind(status.to_string()) - .bind(id) - .fetch_optional(&mut *conn) - .await?; + retry_query(&self.config.retry_config, "set_status", || async { + let mut conn = self.acquire_write_conn_metric("set_status").await?; + let result: Option = sqlx::query_as( + "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *, kafka_offset AS offset", + ) + .bind(status.to_string()) + .bind(id) + .fetch_optional(&mut *conn) + .await?; - let Some(row) = result else { - return Ok(None); - }; + let Some(row) = result else { + return Ok(None); + }; - Ok(Some(row.into())) + Ok(Some(row.into())) + }) + .await } #[instrument(skip_all)] @@ -661,104 +698,132 @@ impl InflightActivationStore for PostgresActivationStore { id: &str, deadline: Option>, ) -> Result<(), Error> { - let mut conn = self - .acquire_write_conn_metric("set_processing_deadline") - .await?; - sqlx::query("UPDATE inflight_taskactivations SET processing_deadline = $1 WHERE id = $2") - .bind(deadline.unwrap()) - .bind(id) - .execute(&mut *conn) - .await?; - Ok(()) + retry_query( + &self.config.retry_config, + "set_processing_deadline", + || async { + let mut conn = self + .acquire_write_conn_metric("set_processing_deadline") + .await?; + sqlx::query( + "UPDATE inflight_taskactivations SET processing_deadline = $1 WHERE id = $2", + ) + .bind(deadline.unwrap()) + .bind(id) + .execute(&mut *conn) + .await?; + Ok(()) + }, + ) + .await } #[instrument(skip_all)] #[framed] async fn delete_activation(&self, id: &str) -> Result<(), Error> { - let mut conn = self.acquire_write_conn_metric("delete_activation").await?; - sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1") - .bind(id) - .execute(&mut *conn) - .await?; - Ok(()) + retry_query(&self.config.retry_config, "delete_activation", || async { + let mut conn = self.acquire_write_conn_metric("delete_activation").await?; + sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1") + .bind(id) + .execute(&mut *conn) + .await?; + Ok(()) + }) + .await } #[instrument(skip_all)] #[framed] async fn get_retry_activations(&self) -> Result, Error> { - let mut query_builder = QueryBuilder::new( - "SELECT id, - activation, - partition, - kafka_offset AS offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - claim_expires_at, - status, - at_most_once, - application, - namespace, - taskname, - on_attempts_exceeded, - bucket - FROM inflight_taskactivations - WHERE status = ", - ); - query_builder.push_bind(InflightActivationStatus::Retry.to_string()); - self.add_partition_condition(&mut query_builder, false); - - Ok(query_builder - .build_query_as::() - .fetch_all(&self.read_pool) - .await? - .into_iter() - .map(|row| row.into()) - .collect()) + retry_query( + &self.config.retry_config, + "get_retry_activations", + || async { + let mut query_builder = QueryBuilder::new( + "SELECT id, + activation, + partition, + kafka_offset AS offset, + added_at, + received_at, + processing_attempts, + expires_at, + delay_until, + processing_deadline_duration, + processing_deadline, + claim_expires_at, + status, + at_most_once, + application, + namespace, + taskname, + on_attempts_exceeded, + bucket + FROM inflight_taskactivations + WHERE status = ", + ); + query_builder.push_bind(InflightActivationStatus::Retry.to_string()); + self.add_partition_condition(&mut query_builder, false); + + Ok(query_builder + .build_query_as::() + .fetch_all(&self.read_pool) + .await? + .into_iter() + .map(|row: TableRow| row.into()) + .collect()) + }, + ) + .await } // Used in tests #[framed] async fn clear(&self) -> Result<(), Error> { - let mut conn = self.acquire_write_conn_metric("clear").await?; - sqlx::query("TRUNCATE TABLE inflight_taskactivations") - .execute(&mut *conn) - .await?; - - Ok(()) + retry_query(&self.config.retry_config, "clear", || async { + let mut conn = self.acquire_write_conn_metric("clear").await?; + sqlx::query("TRUNCATE TABLE inflight_taskactivations") + .execute(&mut *conn) + .await?; + Ok(()) + }) + .await } /// Revert expired push claims back to pending status. #[instrument(skip_all)] #[framed] async fn handle_claim_expiration(&self) -> Result { - let now = Utc::now(); - let mut conn = self - .acquire_write_conn_metric("handle_claim_expiration") - .await?; - - let mut query_builder = QueryBuilder::new( - "UPDATE inflight_taskactivations - SET claim_expires_at = null, - status = ", - ); - query_builder.push_bind(InflightActivationStatus::Pending.to_string()); - query_builder.push( - " WHERE claim_expires_at IS NOT NULL - AND claim_expires_at < ", - ); - query_builder.push_bind(now); - query_builder.push(" AND status = "); - query_builder.push_bind(InflightActivationStatus::Claimed.to_string()); - self.add_partition_condition(&mut query_builder, false); - - let released = query_builder.build().execute(&mut *conn).await?; + retry_query( + &self.config.retry_config, + "handle_claim_expiration", + || async { + let now = Utc::now(); + let mut conn = self + .acquire_write_conn_metric("handle_claim_expiration") + .await?; - Ok(released.rows_affected()) + let mut query_builder = QueryBuilder::new( + "UPDATE inflight_taskactivations + SET claim_expires_at = null, + status = ", + ); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push( + " WHERE claim_expires_at IS NOT NULL + AND claim_expires_at < ", + ); + query_builder.push_bind(now); + query_builder.push(" AND status = "); + query_builder.push_bind(InflightActivationStatus::Claimed.to_string()); + self.add_partition_condition(&mut query_builder, false); + + let released = query_builder.build().execute(&mut *conn).await?; + + Ok(released.rows_affected()) + }, + ) + .await } /// Update tasks that are in processing and have exceeded their processing deadline @@ -822,26 +887,33 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn handle_processing_attempts(&self) -> Result { - let mut conn = self - .acquire_write_conn_metric("handle_processing_attempts") - .await?; - let mut query_builder = QueryBuilder::new( - "UPDATE inflight_taskactivations - SET status = ", - ); - query_builder.push_bind(InflightActivationStatus::Failure.to_string()); - query_builder.push(" WHERE processing_attempts >= "); - query_builder.push_bind(self.config.max_processing_attempts as i32); - query_builder.push(" AND status = "); - query_builder.push_bind(InflightActivationStatus::Pending.to_string()); - self.add_partition_condition(&mut query_builder, false); - let processing_attempts_result = query_builder.build().execute(&mut *conn).await; - - if let Ok(query_res) = processing_attempts_result { - return Ok(query_res.rows_affected()); - } + retry_query( + &self.config.retry_config, + "handle_processing_attempts", + || async { + let mut conn = self + .acquire_write_conn_metric("handle_processing_attempts") + .await?; + let mut query_builder = QueryBuilder::new( + "UPDATE inflight_taskactivations + SET status = ", + ); + query_builder.push_bind(InflightActivationStatus::Failure.to_string()); + query_builder.push(" WHERE processing_attempts >= "); + query_builder.push_bind(self.config.max_processing_attempts as i32); + query_builder.push(" AND status = "); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + self.add_partition_condition(&mut query_builder, false); + let processing_attempts_result = query_builder.build().execute(&mut *conn).await; + + if let Ok(query_res) = processing_attempts_result { + return Ok(query_res.rows_affected()); + } - Err(anyhow!("Could not update tasks past processing_deadline")) + Err(anyhow!("Could not update tasks past processing_deadline")) + }, + ) + .await } /// Perform upkeep work for tasks that are past expires_at deadlines @@ -853,17 +925,20 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn handle_expires_at(&self) -> Result { - let now = Utc::now(); - let mut conn = self.acquire_write_conn_metric("handle_expires_at").await?; - let mut query_builder = - QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE status = "); - query_builder.push_bind(InflightActivationStatus::Pending.to_string()); - query_builder.push(" AND expires_at IS NOT NULL AND expires_at < "); - query_builder.push_bind(now); - self.add_partition_condition(&mut query_builder, false); - let result = query_builder.build().execute(&mut *conn).await?; - - Ok(result.rows_affected()) + retry_query(&self.config.retry_config, "handle_expires_at", || async { + let now = Utc::now(); + let mut conn = self.acquire_write_conn_metric("handle_expires_at").await?; + let mut query_builder = + QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE status = "); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push(" AND expires_at IS NOT NULL AND expires_at < "); + query_builder.push_bind(now); + self.add_partition_condition(&mut query_builder, false); + let result = query_builder.build().execute(&mut *conn).await?; + + Ok(result.rows_affected()) + }) + .await } /// Perform upkeep work for tasks that are past delay_until deadlines @@ -875,22 +950,25 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn handle_delay_until(&self) -> Result { - let now = Utc::now(); - let mut conn = self.acquire_write_conn_metric("handle_delay_until").await?; - - let mut query_builder = QueryBuilder::new( - "UPDATE inflight_taskactivations - SET status = ", - ); - query_builder.push_bind(InflightActivationStatus::Pending.to_string()); - query_builder.push(" WHERE delay_until IS NOT NULL AND delay_until < "); - query_builder.push_bind(now); - query_builder.push(" AND status = "); - query_builder.push_bind(InflightActivationStatus::Delay.to_string()); - self.add_partition_condition(&mut query_builder, false); - let update_result = query_builder.build().execute(&mut *conn).await?; + retry_query(&self.config.retry_config, "handle_delay_until", || async { + let now = Utc::now(); + let mut conn = self.acquire_write_conn_metric("handle_delay_until").await?; - Ok(update_result.rows_affected()) + let mut query_builder = QueryBuilder::new( + "UPDATE inflight_taskactivations + SET status = ", + ); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push(" WHERE delay_until IS NOT NULL AND delay_until < "); + query_builder.push_bind(now); + query_builder.push(" AND status = "); + query_builder.push_bind(InflightActivationStatus::Delay.to_string()); + self.add_partition_condition(&mut query_builder, false); + let update_result = query_builder.build().execute(&mut *conn).await?; + + Ok(update_result.rows_affected()) + }) + .await } /// Perform upkeep work related to status=failure @@ -902,39 +980,72 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn handle_failed_tasks(&self) -> Result { - let mut atomic = self.write_pool.begin().await?; + retry_query( + &self.config.retry_config, + "handle_failed_tasks", + || async { + let mut atomic = self.write_pool.begin().await?; + + let mut query_builder = QueryBuilder::new( + "SELECT id, activation, on_attempts_exceeded FROM inflight_taskactivations WHERE status = ", + ); + query_builder.push_bind(InflightActivationStatus::Failure.to_string()); + self.add_partition_condition(&mut query_builder, false); + let failed_tasks = query_builder + .build_query_as::<(String, Vec, i32)>() + .fetch_all(&mut *atomic) + .await?; - let mut query_builder = QueryBuilder::new( - "SELECT id, activation, on_attempts_exceeded FROM inflight_taskactivations WHERE status = ", - ); - query_builder.push_bind(InflightActivationStatus::Failure.to_string()); - self.add_partition_condition(&mut query_builder, false); - let failed_tasks = query_builder - .build_query_as::<(String, Vec, i32)>() - .fetch_all(&mut *atomic) - .await?; + let mut forwarder = FailedTasksForwarder { + to_discard: vec![], + to_deadletter: vec![], + }; + + for record in failed_tasks.iter() { + let activation_data: &[u8] = record.1.as_slice(); + let id: String = record.0.clone(); + // We could be deadlettering because of activation.expires + // when a task expires we still deadletter if configured. + let on_attempts_exceeded: OnAttemptsExceeded = record.2.try_into().unwrap(); + if on_attempts_exceeded == OnAttemptsExceeded::Discard + || on_attempts_exceeded == OnAttemptsExceeded::Unspecified + { + forwarder.to_discard.push((id, activation_data.to_vec())) + } else if on_attempts_exceeded == OnAttemptsExceeded::Deadletter { + forwarder.to_deadletter.push((id, activation_data.to_vec())) + } + } - let mut forwarder = FailedTasksForwarder { - to_discard: vec![], - to_deadletter: vec![], - }; - - for record in failed_tasks.iter() { - let activation_data: &[u8] = record.1.as_slice(); - let id: String = record.0.clone(); - // We could be deadlettering because of activation.expires - // when a task expires we still deadletter if configured. - let on_attempts_exceeded: OnAttemptsExceeded = record.2.try_into().unwrap(); - if on_attempts_exceeded == OnAttemptsExceeded::Discard - || on_attempts_exceeded == OnAttemptsExceeded::Unspecified - { - forwarder.to_discard.push((id, activation_data.to_vec())) - } else if on_attempts_exceeded == OnAttemptsExceeded::Deadletter { - forwarder.to_deadletter.push((id, activation_data.to_vec())) - } - } + if !forwarder.to_discard.is_empty() { + let mut query_builder = + QueryBuilder::new("UPDATE inflight_taskactivations "); + query_builder + .push("SET status = ") + .push_bind(InflightActivationStatus::Complete.to_string()) + .push(" WHERE id IN ("); + + let mut separated = query_builder.separated(", "); + for (id, _body) in forwarder.to_discard.iter() { + separated.push_bind(id); + } + separated.push_unseparated(")"); + + query_builder.build().execute(&mut *atomic).await?; + } + + atomic.commit().await?; - if !forwarder.to_discard.is_empty() { + Ok(forwarder) + }, + ) + .await + } + + /// Mark a collection of tasks as complete by id + #[instrument(skip_all)] + #[framed] + async fn mark_completed(&self, ids: Vec) -> Result { + retry_query(&self.config.retry_config, "mark_completed", || async { let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); query_builder .push("SET status = ") @@ -942,38 +1053,16 @@ impl InflightActivationStore for PostgresActivationStore { .push(" WHERE id IN ("); let mut separated = query_builder.separated(", "); - for (id, _body) in forwarder.to_discard.iter() { + for id in ids.iter() { separated.push_bind(id); } separated.push_unseparated(")"); + let mut conn = self.acquire_write_conn_metric("mark_completed").await?; + let result = query_builder.build().execute(&mut *conn).await?; - query_builder.build().execute(&mut *atomic).await?; - } - - atomic.commit().await?; - - Ok(forwarder) - } - - /// Mark a collection of tasks as complete by id - #[instrument(skip_all)] - #[framed] - async fn mark_completed(&self, ids: Vec) -> Result { - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); - query_builder - .push("SET status = ") - .push_bind(InflightActivationStatus::Complete.to_string()) - .push(" WHERE id IN ("); - - let mut separated = query_builder.separated(", "); - for id in ids.iter() { - separated.push_bind(id); - } - separated.push_unseparated(")"); - let mut conn = self.acquire_write_conn_metric("mark_completed").await?; - let result = query_builder.build().execute(&mut *conn).await?; - - Ok(result.rows_affected()) + Ok(result.rows_affected()) + }) + .await } /// Remove completed tasks. @@ -981,34 +1070,40 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] #[framed] async fn remove_completed(&self) -> Result { - let mut conn = self.acquire_write_conn_metric("remove_completed").await?; - let mut query_builder = - QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE status = "); - query_builder.push_bind(InflightActivationStatus::Complete.to_string()); - self.add_partition_condition(&mut query_builder, false); - let result = query_builder.build().execute(&mut *conn).await?; - - Ok(result.rows_affected()) + retry_query(&self.config.retry_config, "remove_completed", || async { + let mut conn = self.acquire_write_conn_metric("remove_completed").await?; + let mut query_builder = + QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE status = "); + query_builder.push_bind(InflightActivationStatus::Complete.to_string()); + self.add_partition_condition(&mut query_builder, false); + let result = query_builder.build().execute(&mut *conn).await?; + + Ok(result.rows_affected()) + }) + .await } /// Remove killswitched tasks. #[instrument(skip_all)] #[framed] async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { - let mut query_builder = - QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); - let mut separated = query_builder.separated(", "); - for taskname in killswitched_tasks.iter() { - separated.push_bind(taskname); - } - separated.push_unseparated(")"); - self.add_partition_condition(&mut query_builder, false); - let mut conn = self - .acquire_write_conn_metric("remove_killswitched") - .await?; - let query = query_builder.build().execute(&mut *conn).await?; + retry_query(&self.config.retry_config, "remove_killswitched", || async { + let mut query_builder = + QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); + let mut separated = query_builder.separated(", "); + for taskname in killswitched_tasks.iter() { + separated.push_bind(taskname); + } + separated.push_unseparated(")"); + self.add_partition_condition(&mut query_builder, false); + let mut conn = self + .acquire_write_conn_metric("remove_killswitched") + .await?; + let query = query_builder.build().execute(&mut *conn).await?; - Ok(query.rows_affected()) + Ok(query.rows_affected()) + }) + .await } // Used in tests diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 8692ac0c..fdd55b5b 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -25,10 +25,11 @@ use tracing::{instrument, warn}; use crate::config::Config; use crate::store::activation::{InflightActivation, InflightActivationStatus}; +use crate::store::retry::{RetryConfig, retry_query}; use crate::store::traits::InflightActivationStore; use crate::store::types::{BucketRange, FailedTasksForwarder}; -#[derive(Debug, FromRow)] +#[derive(Debug, Clone, FromRow)] pub struct TableRow { pub id: String, pub activation: Vec, @@ -143,6 +144,7 @@ pub struct InflightActivationStoreConfig { pub claim_lease_ms: u64, pub vacuum_page_count: Option, pub enable_sqlite_status_metrics: bool, + pub retry_config: RetryConfig, } impl InflightActivationStoreConfig { @@ -153,6 +155,7 @@ impl InflightActivationStoreConfig { processing_deadline_grace_sec: config.processing_deadline_grace_sec, claim_lease_ms: config.fetch_batch_size.max(1) as u64 * config.push_queue_timeout_ms, enable_sqlite_status_metrics: config.enable_sqlite_status_metrics, + retry_config: RetryConfig::from_config(config), } } } @@ -374,70 +377,24 @@ impl InflightActivationStore for SqliteActivationStore { /// Get the size of the database in bytes based on SQLite metadata queries. async fn db_size(&self) -> Result { - let result: u64 = sqlx::query( - "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()", - ) - .fetch_one(&self.read_pool) - .await? - .get(0); - - Ok(result) + retry_query(&self.config.retry_config, "db_size", || async { + let result: u64 = sqlx::query( + "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()", + ) + .fetch_one(&self.read_pool) + .await? + .get(0); + Ok(result) + }) + .await } /// Get an activation by id. Primarily used for testing async fn get_by_id(&self, id: &str) -> Result, Error> { - let row_result: Option = sqlx::query_as( - " - SELECT id, - activation, - partition, - offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - claim_expires_at, - status, - at_most_once, - application, - namespace, - taskname, - on_attempts_exceeded, - bucket - FROM inflight_taskactivations - WHERE id = $1 - ", - ) - .bind(id) - .fetch_optional(&self.read_pool) - .await?; - - let Some(row) = row_result else { - return Ok(None); - }; - - Ok(Some(row.into())) - } - - fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { - warn!("assign_partitions: {:?}", partitions); - Ok(()) - } - - #[instrument(skip_all)] - async fn store(&self, batch: Vec) -> Result { - if batch.is_empty() { - return Ok(0); - } - - let mut query_builder = QueryBuilder::::new( - " - INSERT INTO inflight_taskactivations - ( - id, + retry_query(&self.config.retry_config, "get_by_id", || async { + let row_result: Option = sqlx::query_as( + " + SELECT id, activation, partition, offset, @@ -456,55 +413,111 @@ impl InflightActivationStore for SqliteActivationStore { taskname, on_attempts_exceeded, bucket - ) - ", - ); + FROM inflight_taskactivations + WHERE id = $1 + ", + ) + .bind(id) + .fetch_optional(&self.read_pool) + .await?; + + let Some(row) = row_result else { + return Ok(None); + }; + + Ok(Some(row.into())) + }) + .await + } + + fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { + warn!("assign_partitions: {:?}", partitions); + Ok(()) + } + + #[instrument(skip_all)] + async fn store(&self, batch: Vec) -> Result { + if batch.is_empty() { + return Ok(0); + } + let rows = batch .into_iter() .map(TableRow::try_from) .collect::, _>>()?; - let query = query_builder - .push_values(rows, |mut b, row| { - b.push_bind(row.id); - b.push_bind(row.activation); - b.push_bind(row.partition); - b.push_bind(row.offset); - b.push_bind(row.added_at.timestamp()); - b.push_bind(row.received_at.timestamp()); - b.push_bind(row.processing_attempts); - b.push_bind(row.expires_at.map(|t| Some(t.timestamp()))); - b.push_bind(row.delay_until.map(|t| Some(t.timestamp()))); - b.push_bind(row.processing_deadline_duration); - - if let Some(deadline) = row.processing_deadline { - b.push_bind(deadline.timestamp()); - } else { - b.push("null"); - } - - if let Some(exp) = row.claim_expires_at { - b.push_bind(exp.timestamp()); - } else { - b.push("null"); - } - - b.push_bind(row.status); - b.push_bind(row.at_most_once); - b.push_bind(row.application); - b.push_bind(row.namespace); - b.push_bind(row.taskname); - b.push_bind(row.on_attempts_exceeded as i32); - b.push_bind(row.bucket); - }) - .push(" ON CONFLICT(id) DO NOTHING") - .build(); - let mut conn = self.acquire_write_conn_metric("store").await?; - let result = query.execute(&mut *conn).await?; - let rows_affected = Ok(result.rows_affected()); + let rows_affected = retry_query(&self.config.retry_config, "store", || async { + let mut query_builder = QueryBuilder::::new( + " + INSERT INTO inflight_taskactivations + ( + id, + activation, + partition, + offset, + added_at, + received_at, + processing_attempts, + expires_at, + delay_until, + processing_deadline_duration, + processing_deadline, + claim_expires_at, + status, + at_most_once, + application, + namespace, + taskname, + on_attempts_exceeded, + bucket + ) + ", + ); + let query = query_builder + .push_values(rows.clone(), |mut b, row: TableRow| { + b.push_bind(row.id); + b.push_bind(row.activation); + b.push_bind(row.partition); + b.push_bind(row.offset); + b.push_bind(row.added_at.timestamp()); + b.push_bind(row.received_at.timestamp()); + b.push_bind(row.processing_attempts); + b.push_bind(row.expires_at.map(|t| Some(t.timestamp()))); + b.push_bind(row.delay_until.map(|t| Some(t.timestamp()))); + b.push_bind(row.processing_deadline_duration); + + if let Some(deadline) = row.processing_deadline { + b.push_bind(deadline.timestamp()); + } else { + b.push("null"); + } + + if let Some(exp) = row.claim_expires_at { + b.push_bind(exp.timestamp()); + } else { + b.push("null"); + } + + b.push_bind(row.status); + b.push_bind(row.at_most_once); + b.push_bind(row.application); + b.push_bind(row.namespace); + b.push_bind(row.taskname); + b.push_bind(row.on_attempts_exceeded as i32); + b.push_bind(row.bucket); + }) + .push(" ON CONFLICT(id) DO NOTHING") + .build(); + let mut conn = self.acquire_write_conn_metric("store").await?; + let result = query.execute(&mut *conn).await?; + Ok(result.rows_affected()) + }) + .await?; // Sync the WAL into the main database so we don't lose data on host failure. + // Best-effort — errors are swallowed and only recorded as metrics. let checkpoint_timer = Instant::now(); + let mut conn = self.acquire_write_conn_metric("store_checkpoint").await?; let checkpoint_result = sqlx::query("PRAGMA wal_checkpoint(PASSIVE)") .fetch_one(&mut *conn) .await; @@ -522,7 +535,7 @@ impl InflightActivationStore for SqliteActivationStore { } metrics::histogram!("store.checkpoint.duration").record(checkpoint_timer.elapsed()); - rows_affected + Ok(rows_affected) } #[instrument(skip_all)] @@ -534,86 +547,97 @@ impl InflightActivationStore for SqliteActivationStore { bucket: Option, mark_processing: bool, ) -> Result, Error> { - let now = Utc::now(); - let grace_period = self.config.processing_deadline_grace_sec; + retry_query(&self.config.retry_config, "claim_activations", || async { + let now = Utc::now(); + let grace_period = self.config.processing_deadline_grace_sec; - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations SET "); + let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations SET "); - if mark_processing { - query_builder.push(format!( - "processing_deadline = unixepoch('now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'), claim_expires_at = NULL, status = " - )); + if mark_processing { + query_builder.push(format!( + "processing_deadline = unixepoch('now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'), claim_expires_at = NULL, status = " + )); - query_builder.push_bind(InflightActivationStatus::Processing); - } else { - query_builder.push(format!( - "claim_expires_at = unixepoch('now', '+' || {:.3} || ' seconds', '+' || {grace_period} || ' seconds'), processing_deadline = NULL, status = ", - self.config.claim_lease_ms as f64 / 1000.0, - )); + query_builder.push_bind(InflightActivationStatus::Processing); + } else { + query_builder.push(format!( + "claim_expires_at = unixepoch('now', '+' || {:.3} || ' seconds', '+' || {grace_period} || ' seconds'), processing_deadline = NULL, status = ", + self.config.claim_lease_ms as f64 / 1000.0, + )); - query_builder.push_bind(InflightActivationStatus::Claimed); - } + query_builder.push_bind(InflightActivationStatus::Claimed); + } - query_builder.push(" WHERE id IN (SELECT id FROM inflight_taskactivations WHERE status = "); - query_builder.push_bind(InflightActivationStatus::Pending); - query_builder.push(" AND (expires_at IS NULL OR expires_at > "); - query_builder.push_bind(now.timestamp()); - query_builder.push(")"); + query_builder + .push(" WHERE id IN (SELECT id FROM inflight_taskactivations WHERE status = "); + query_builder.push_bind(InflightActivationStatus::Pending); + query_builder.push(" AND (expires_at IS NULL OR expires_at > "); + query_builder.push_bind(now.timestamp()); + query_builder.push(")"); - if let Some(value) = application { - query_builder.push(" AND application ="); - query_builder.push_bind(value); - } - if let Some(namespaces) = namespaces - && !namespaces.is_empty() - { - query_builder.push(" AND namespace IN ("); - let mut separated = query_builder.separated(", "); - for namespace in namespaces.iter() { - separated.push_bind(namespace); + if let Some(value) = application { + query_builder.push(" AND application ="); + query_builder.push_bind(value); } - query_builder.push(")"); - } - if let Some((min, max)) = bucket { - query_builder.push(" AND bucket >= "); - query_builder.push_bind(min); - query_builder.push(" AND bucket <= "); - query_builder.push_bind(max); - } - query_builder.push(" ORDER BY added_at"); - if let Some(limit) = limit { - query_builder.push(" LIMIT "); - query_builder.push_bind(limit); - } - query_builder.push(") RETURNING *"); + if let Some(namespaces) = namespaces + && !namespaces.is_empty() + { + query_builder.push(" AND namespace IN ("); + let mut separated = query_builder.separated(", "); + for namespace in namespaces.iter() { + separated.push_bind(namespace); + } + query_builder.push(")"); + } + if let Some((min, max)) = bucket { + query_builder.push(" AND bucket >= "); + query_builder.push_bind(min); + query_builder.push(" AND bucket <= "); + query_builder.push_bind(max); + } + query_builder.push(" ORDER BY added_at"); + if let Some(limit) = limit { + query_builder.push(" LIMIT "); + query_builder.push_bind(limit); + } + query_builder.push(") RETURNING *"); - let mut conn = self.acquire_write_conn_metric("claim_activations").await?; - let rows: Vec = query_builder - .build_query_as::() - .fetch_all(&mut *conn) - .await?; + let mut conn = self.acquire_write_conn_metric("claim_activations").await?; + let rows: Vec = query_builder + .build_query_as::() + .fetch_all(&mut *conn) + .await?; - Ok(rows.into_iter().map(|row| row.into()).collect()) + Ok(rows.into_iter().map(|row| row.into()).collect()) + }) + .await } #[instrument(skip_all)] async fn mark_activation_processing(&self, id: &str) -> Result<(), Error> { - let mut conn = self - .acquire_write_conn_metric("mark_activation_processing") - .await?; - let grace_period = self.config.processing_deadline_grace_sec; - let result = sqlx::query(&format!( - "UPDATE inflight_taskactivations SET - status = $1, - processing_deadline = unixepoch('now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'), - claim_expires_at = NULL - WHERE id = $2 AND status = $3", - )) - .bind(InflightActivationStatus::Processing) - .bind(id) - .bind(InflightActivationStatus::Claimed) - .execute(&mut *conn) + let result = retry_query( + &self.config.retry_config, + "mark_activation_processing", + || async { + let mut conn = self + .acquire_write_conn_metric("mark_activation_processing") + .await?; + let result = sqlx::query(&format!( + "UPDATE inflight_taskactivations SET + status = $1, + processing_deadline = unixepoch('now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'), + claim_expires_at = NULL + WHERE id = $2 AND status = $3", + )) + .bind(InflightActivationStatus::Processing) + .bind(id) + .bind(InflightActivationStatus::Claimed) + .execute(&mut *conn) + .await?; + Ok(result) + }, + ) .await?; if result.rows_affected() == 0 { @@ -668,19 +692,26 @@ impl InflightActivationStore for SqliteActivationStore { #[instrument(skip_all)] async fn count_by_status(&self, status: InflightActivationStatus) -> Result { - let result = - sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = $1") - .bind(status) - .fetch_one(&self.read_pool) - .await?; - Ok(result.get::("count") as usize) + retry_query(&self.config.retry_config, "count_by_status", || async { + let result = sqlx::query( + "SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = $1", + ) + .bind(status) + .fetch_one(&self.read_pool) + .await?; + Ok(result.get::("count") as usize) + }) + .await } async fn count(&self) -> Result { - let result = sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations") - .fetch_one(&self.read_pool) - .await?; - Ok(result.get::("count") as usize) + retry_query(&self.config.retry_config, "count", || async { + let result = sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations") + .fetch_one(&self.read_pool) + .await?; + Ok(result.get::("count") as usize) + }) + .await } /// Update the status of a specific activation @@ -690,20 +721,23 @@ impl InflightActivationStore for SqliteActivationStore { id: &str, status: InflightActivationStatus, ) -> Result, Error> { - let mut conn = self.acquire_write_conn_metric("set_status").await?; - let result: Option = sqlx::query_as( - "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *", - ) - .bind(status) - .bind(id) - .fetch_optional(&mut *conn) - .await?; + retry_query(&self.config.retry_config, "set_status", || async { + let mut conn = self.acquire_write_conn_metric("set_status").await?; + let result: Option = sqlx::query_as( + "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *", + ) + .bind(status) + .bind(id) + .fetch_optional(&mut *conn) + .await?; - let Some(row) = result else { - return Ok(None); - }; + let Some(row) = result else { + return Ok(None); + }; - Ok(Some(row.into())) + Ok(Some(row.into())) + }) + .await } #[instrument(skip_all)] @@ -712,93 +746,122 @@ impl InflightActivationStore for SqliteActivationStore { id: &str, deadline: Option>, ) -> Result<(), Error> { - let mut conn = self - .acquire_write_conn_metric("set_processing_deadline") - .await?; - sqlx::query("UPDATE inflight_taskactivations SET processing_deadline = $1 WHERE id = $2") - .bind(deadline.unwrap().timestamp()) - .bind(id) - .execute(&mut *conn) - .await?; - Ok(()) + retry_query( + &self.config.retry_config, + "set_processing_deadline", + || async { + let mut conn = self + .acquire_write_conn_metric("set_processing_deadline") + .await?; + sqlx::query( + "UPDATE inflight_taskactivations SET processing_deadline = $1 WHERE id = $2", + ) + .bind(deadline.unwrap().timestamp()) + .bind(id) + .execute(&mut *conn) + .await?; + Ok(()) + }, + ) + .await } #[instrument(skip_all)] async fn delete_activation(&self, id: &str) -> Result<(), Error> { - let mut conn = self.acquire_write_conn_metric("delete_activation").await?; - sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1") - .bind(id) - .execute(&mut *conn) - .await?; - Ok(()) + retry_query(&self.config.retry_config, "delete_activation", || async { + let mut conn = self.acquire_write_conn_metric("delete_activation").await?; + sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1") + .bind(id) + .execute(&mut *conn) + .await?; + Ok(()) + }) + .await } #[instrument(skip_all)] async fn get_retry_activations(&self) -> Result, Error> { - Ok(sqlx::query_as( - " - SELECT id, - activation, - partition, - offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - claim_expires_at, - status, - at_most_once, - application, - namespace, - taskname, - on_attempts_exceeded, - bucket - FROM inflight_taskactivations - WHERE status = $1 - ", + retry_query( + &self.config.retry_config, + "get_retry_activations", + || async { + Ok(sqlx::query_as( + " + SELECT id, + activation, + partition, + offset, + added_at, + received_at, + processing_attempts, + expires_at, + delay_until, + processing_deadline_duration, + processing_deadline, + claim_expires_at, + status, + at_most_once, + application, + namespace, + taskname, + on_attempts_exceeded, + bucket + FROM inflight_taskactivations + WHERE status = $1 + ", + ) + .bind(InflightActivationStatus::Retry) + .fetch_all(&self.read_pool) + .await? + .into_iter() + .map(|row: TableRow| row.into()) + .collect()) + }, ) - .bind(InflightActivationStatus::Retry) - .fetch_all(&self.read_pool) - .await? - .into_iter() - .map(|row: TableRow| row.into()) - .collect()) + .await } async fn clear(&self) -> Result<(), Error> { - let mut conn = self.acquire_write_conn_metric("clear").await?; - sqlx::query("DELETE FROM inflight_taskactivations") - .execute(&mut *conn) - .await?; - Ok(()) + retry_query(&self.config.retry_config, "clear", || async { + let mut conn = self.acquire_write_conn_metric("clear").await?; + sqlx::query("DELETE FROM inflight_taskactivations") + .execute(&mut *conn) + .await?; + Ok(()) + }) + .await } /// Expired push claims (`Claimed` + past `claim_expires_at`). #[instrument(skip_all)] async fn handle_claim_expiration(&self) -> Result { - let now = Utc::now(); - let mut conn = self - .acquire_write_conn_metric("handle_claim_expiration") - .await?; + retry_query( + &self.config.retry_config, + "handle_claim_expiration", + || async { + let now = Utc::now(); + let mut conn = self + .acquire_write_conn_metric("handle_claim_expiration") + .await?; + + let released = sqlx::query( + "UPDATE inflight_taskactivations + SET claim_expires_at = null, + status = $1 + WHERE claim_expires_at IS NOT NULL + AND claim_expires_at < $2 + AND status = $3", + ) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()) + .bind(InflightActivationStatus::Claimed) + .execute(&mut *conn) + .await?; - let released = sqlx::query( - "UPDATE inflight_taskactivations - SET claim_expires_at = null, - status = $1 - WHERE claim_expires_at IS NOT NULL - AND claim_expires_at < $2 - AND status = $3", + Ok(released.rows_affected()) + }, ) - .bind(InflightActivationStatus::Pending) - .bind(now.timestamp()) - .bind(InflightActivationStatus::Claimed) - .execute(&mut *conn) - .await?; - - Ok(released.rows_affected()) + .await } /// Update tasks that are in processing and have exceeded their processing deadline @@ -809,8 +872,6 @@ impl InflightActivationStore for SqliteActivationStore { let now = Utc::now(); let mut atomic = self.write_pool.begin().await?; - // Idempotent tasks that fail their processing deadlines go directly to failure - // there are no retries, as the worker will reject the task due to idempotency keys. let most_once_result = sqlx::query( "UPDATE inflight_taskactivations SET processing_deadline = null, status = $1 @@ -827,8 +888,6 @@ impl InflightActivationStore for SqliteActivationStore { processing_deadline_modified_rows = query_res.rows_affected(); } - // Update non-idempotent tasks. - // Increment processing_attempts by 1 and reset processing_deadline to null. let result = sqlx::query( "UPDATE inflight_taskactivations SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1 @@ -854,25 +913,32 @@ impl InflightActivationStore for SqliteActivationStore { /// These tasks are set to status=failure and will be handled by handle_failed_tasks accordingly. #[instrument(skip_all)] async fn handle_processing_attempts(&self) -> Result { - let mut conn = self - .acquire_write_conn_metric("handle_processing_attempts") - .await?; - let processing_attempts_result = sqlx::query( - "UPDATE inflight_taskactivations - SET status = $1 - WHERE processing_attempts >= $2 AND status = $3", - ) - .bind(InflightActivationStatus::Failure) - .bind(self.config.max_processing_attempts as i32) - .bind(InflightActivationStatus::Pending) - .execute(&mut *conn) - .await; + retry_query( + &self.config.retry_config, + "handle_processing_attempts", + || async { + let mut conn = self + .acquire_write_conn_metric("handle_processing_attempts") + .await?; + let processing_attempts_result = sqlx::query( + "UPDATE inflight_taskactivations + SET status = $1 + WHERE processing_attempts >= $2 AND status = $3", + ) + .bind(InflightActivationStatus::Failure) + .bind(self.config.max_processing_attempts as i32) + .bind(InflightActivationStatus::Pending) + .execute(&mut *conn) + .await; - if let Ok(query_res) = processing_attempts_result { - return Ok(query_res.rows_affected()); - } + if let Ok(query_res) = processing_attempts_result { + return Ok(query_res.rows_affected()); + } - Err(anyhow!("Could not update tasks past processing_deadline")) + Err(anyhow!("Could not update tasks past processing_deadline")) + }, + ) + .await } /// Perform upkeep work for tasks that are past expires_at deadlines @@ -883,17 +949,20 @@ impl InflightActivationStore for SqliteActivationStore { /// The number of impacted records is returned in a Result. #[instrument(skip_all)] async fn handle_expires_at(&self) -> Result { - let now = Utc::now(); - let mut conn = self.acquire_write_conn_metric("handle_expires_at").await?; - let query = sqlx::query( - "DELETE FROM inflight_taskactivations WHERE status = $1 AND expires_at IS NOT NULL AND expires_at < $2", - ) - .bind(InflightActivationStatus::Pending) - .bind(now.timestamp()) - .execute(&mut *conn) - .await?; + retry_query(&self.config.retry_config, "handle_expires_at", || async { + let now = Utc::now(); + let mut conn = self.acquire_write_conn_metric("handle_expires_at").await?; + let query = sqlx::query( + "DELETE FROM inflight_taskactivations WHERE status = $1 AND expires_at IS NOT NULL AND expires_at < $2", + ) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()) + .execute(&mut *conn) + .await?; - Ok(query.rows_affected()) + Ok(query.rows_affected()) + }) + .await } /// Perform upkeep work for tasks that are past delay_until deadlines @@ -904,21 +973,24 @@ impl InflightActivationStore for SqliteActivationStore { /// The number of impacted records is returned in a Result. #[instrument(skip_all)] async fn handle_delay_until(&self) -> Result { - let now = Utc::now(); - let mut conn = self.acquire_write_conn_metric("handle_delay_until").await?; - let update_result = sqlx::query( - r#"UPDATE inflight_taskactivations - SET status = $1 - WHERE delay_until IS NOT NULL AND delay_until < $2 AND status = $3 - "#, - ) - .bind(InflightActivationStatus::Pending) - .bind(now.timestamp()) - .bind(InflightActivationStatus::Delay) - .execute(&mut *conn) - .await?; + retry_query(&self.config.retry_config, "handle_delay_until", || async { + let now = Utc::now(); + let mut conn = self.acquire_write_conn_metric("handle_delay_until").await?; + let update_result = sqlx::query( + r#"UPDATE inflight_taskactivations + SET status = $1 + WHERE delay_until IS NOT NULL AND delay_until < $2 AND status = $3 + "#, + ) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()) + .bind(InflightActivationStatus::Delay) + .execute(&mut *conn) + .await?; - Ok(update_result.rows_affected()) + Ok(update_result.rows_affected()) + }) + .await } /// Perform upkeep work related to status=failure @@ -929,39 +1001,69 @@ impl InflightActivationStore for SqliteActivationStore { /// complete. #[instrument(skip_all)] async fn handle_failed_tasks(&self) -> Result { - let mut atomic = self.write_pool.begin().await?; + retry_query( + &self.config.retry_config, + "handle_failed_tasks", + || async { + let mut atomic = self.write_pool.begin().await?; + + let failed_tasks: Vec = + sqlx::query("SELECT id, activation, on_attempts_exceeded FROM inflight_taskactivations WHERE status = $1") + .bind(InflightActivationStatus::Failure) + .fetch_all(&mut *atomic) + .await? + .into_iter() + .collect(); + + let mut forwarder = FailedTasksForwarder { + to_discard: vec![], + to_deadletter: vec![], + }; + + for record in failed_tasks.iter() { + let activation_data: &[u8] = record.get("activation"); + let id: String = record.get("id"); + let on_attempts_exceeded_val: i32 = record.get("on_attempts_exceeded"); + let on_attempts_exceeded: OnAttemptsExceeded = + on_attempts_exceeded_val.try_into().unwrap(); + if on_attempts_exceeded == OnAttemptsExceeded::Discard + || on_attempts_exceeded == OnAttemptsExceeded::Unspecified + { + forwarder.to_discard.push((id, activation_data.to_vec())) + } else if on_attempts_exceeded == OnAttemptsExceeded::Deadletter { + forwarder.to_deadletter.push((id, activation_data.to_vec())) + } + } - let failed_tasks: Vec = - sqlx::query("SELECT id, activation, on_attempts_exceeded FROM inflight_taskactivations WHERE status = $1") - .bind(InflightActivationStatus::Failure) - .fetch_all(&mut *atomic) - .await? - .into_iter() - .collect(); - - let mut forwarder = FailedTasksForwarder { - to_discard: vec![], - to_deadletter: vec![], - }; - - for record in failed_tasks.iter() { - let activation_data: &[u8] = record.get("activation"); - let id: String = record.get("id"); - // We could be deadlettering because of activation.expires - // when a task expires we still deadletter if configured. - let on_attempts_exceeded_val: i32 = record.get("on_attempts_exceeded"); - let on_attempts_exceeded: OnAttemptsExceeded = - on_attempts_exceeded_val.try_into().unwrap(); - if on_attempts_exceeded == OnAttemptsExceeded::Discard - || on_attempts_exceeded == OnAttemptsExceeded::Unspecified - { - forwarder.to_discard.push((id, activation_data.to_vec())) - } else if on_attempts_exceeded == OnAttemptsExceeded::Deadletter { - forwarder.to_deadletter.push((id, activation_data.to_vec())) - } - } + if !forwarder.to_discard.is_empty() { + let mut query_builder = + QueryBuilder::new("UPDATE inflight_taskactivations "); + query_builder + .push("SET status = ") + .push_bind(InflightActivationStatus::Complete) + .push(" WHERE id IN ("); + + let mut separated = query_builder.separated(", "); + for (id, _body) in forwarder.to_discard.iter() { + separated.push_bind(id); + } + separated.push_unseparated(")"); + + query_builder.build().execute(&mut *atomic).await?; + } - if !forwarder.to_discard.is_empty() { + atomic.commit().await?; + + Ok(forwarder) + }, + ) + .await + } + + /// Mark a collection of tasks as complete by id + #[instrument(skip_all)] + async fn mark_completed(&self, ids: Vec) -> Result { + retry_query(&self.config.retry_config, "mark_completed", || async { let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); query_builder .push("SET status = ") @@ -969,67 +1071,52 @@ impl InflightActivationStore for SqliteActivationStore { .push(" WHERE id IN ("); let mut separated = query_builder.separated(", "); - for (id, _body) in forwarder.to_discard.iter() { + for id in ids.iter() { separated.push_bind(id); } separated.push_unseparated(")"); + let mut conn = self.acquire_write_conn_metric("mark_completed").await?; + let result = query_builder.build().execute(&mut *conn).await?; - query_builder.build().execute(&mut *atomic).await?; - } - - atomic.commit().await?; - - Ok(forwarder) - } - - /// Mark a collection of tasks as complete by id - #[instrument(skip_all)] - async fn mark_completed(&self, ids: Vec) -> Result { - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); - query_builder - .push("SET status = ") - .push_bind(InflightActivationStatus::Complete) - .push(" WHERE id IN ("); - - let mut separated = query_builder.separated(", "); - for id in ids.iter() { - separated.push_bind(id); - } - separated.push_unseparated(")"); - let mut conn = self.acquire_write_conn_metric("mark_completed").await?; - let result = query_builder.build().execute(&mut *conn).await?; - - Ok(result.rows_affected()) + Ok(result.rows_affected()) + }) + .await } /// Remove completed tasks. /// This method is a garbage collector for the inflight task store. #[instrument(skip_all)] async fn remove_completed(&self) -> Result { - let mut conn = self.acquire_write_conn_metric("remove_completed").await?; - let query = sqlx::query("DELETE FROM inflight_taskactivations WHERE status = $1") - .bind(InflightActivationStatus::Complete) - .execute(&mut *conn) - .await?; + retry_query(&self.config.retry_config, "remove_completed", || async { + let mut conn = self.acquire_write_conn_metric("remove_completed").await?; + let query = sqlx::query("DELETE FROM inflight_taskactivations WHERE status = $1") + .bind(InflightActivationStatus::Complete) + .execute(&mut *conn) + .await?; - Ok(query.rows_affected()) + Ok(query.rows_affected()) + }) + .await } /// Remove killswitched tasks. #[instrument(skip_all)] async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { - let mut query_builder = - QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); - let mut separated = query_builder.separated(", "); - for taskname in killswitched_tasks.iter() { - separated.push_bind(taskname); - } - separated.push_unseparated(")"); - let mut conn = self - .acquire_write_conn_metric("remove_killswitched") - .await?; - let query = query_builder.build().execute(&mut *conn).await?; + retry_query(&self.config.retry_config, "remove_killswitched", || async { + let mut query_builder = + QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); + let mut separated = query_builder.separated(", "); + for taskname in killswitched_tasks.iter() { + separated.push_bind(taskname); + } + separated.push_unseparated(")"); + let mut conn = self + .acquire_write_conn_metric("remove_killswitched") + .await?; + let query = query_builder.build().execute(&mut *conn).await?; - Ok(query.rows_affected()) + Ok(query.rows_affected()) + }) + .await } } diff --git a/src/store/retry.rs b/src/store/retry.rs new file mode 100644 index 00000000..03c994ed --- /dev/null +++ b/src/store/retry.rs @@ -0,0 +1,215 @@ +use std::future::Future; +use std::time::Duration; + +use anyhow::Error; +use tokio::time::sleep; +use tracing::{debug, warn}; + +use crate::config::Config; + +/// Configuration for query-level retry behavior. +pub struct RetryConfig { + pub max_retries: u32, + pub retry_delay: Duration, +} + +impl RetryConfig { + pub fn from_config(config: &Config) -> Self { + Self { + max_retries: config.db_query_max_retries.unwrap_or(0), + retry_delay: Duration::from_millis(config.db_query_retry_delay_ms), + } + } +} + +/// Returns true if the error is a transient database/connection error +/// that is likely to succeed on retry. Downcasts the anyhow::Error to +/// sqlx::Error to match on structured variants rather than parsing strings. +fn is_retryable_error(err: &Error) -> bool { + matches!( + err.downcast_ref::(), + Some(sqlx::Error::Io(_)) + | Some(sqlx::Error::PoolTimedOut) + | Some(sqlx::Error::PoolClosed) + | Some(sqlx::Error::WorkerCrashed) + ) +} + +/// Retries a query-producing closure on transient database errors. +/// +/// The closure `f` is called on each attempt, producing a fresh future. +/// This ensures connection re-acquisition and query re-execution happen +/// naturally on each retry. +pub async fn retry_query( + config: &RetryConfig, + label: &'static str, + f: F, +) -> Result +where + F: Fn() -> Fut, + Fut: Future>, +{ + let mut attempt = 0u32; + loop { + match f().await { + Ok(val) => { + if attempt > 0 { + debug!(label, attempt, "Query succeeded after retry"); + metrics::counter!("store.retry.succeeded", "method" => label).increment(1); + } + return Ok(val); + } + Err(err) if attempt < config.max_retries && is_retryable_error(&err) => { + warn!( + label, + attempt, + error = %err, + "Retryable database error, retrying" + ); + metrics::counter!("store.retry.attempt", "method" => label).increment(1); + sleep(config.retry_delay).await; + attempt += 1; + } + Err(err) => { + if attempt > 0 { + metrics::counter!("store.retry.exhausted", "method" => label).increment(1); + } + return Err(err); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use std::sync::atomic::{AtomicU32, Ordering}; + + fn retryable_error() -> Error { + Error::from(sqlx::Error::PoolTimedOut) + } + + fn non_retryable_error() -> Error { + Error::from(sqlx::Error::RowNotFound) + } + + fn test_config(max_retries: u32) -> RetryConfig { + RetryConfig { + max_retries, + retry_delay: Duration::from_millis(0), + } + } + + #[tokio::test] + async fn test_retry_succeeds_after_retryable_errors() { + let fail_count = AtomicU32::new(2); + let config = test_config(3); + + let result = retry_query(&config, "test", || async { + let remaining = fail_count.load(Ordering::SeqCst); + if remaining > 0 { + fail_count.fetch_sub(1, Ordering::SeqCst); + Err(retryable_error()) + } else { + Ok(42u64) + } + }) + .await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 42); + } + + #[tokio::test] + async fn test_retry_exhausted_surfaces_error() { + let fail_count = AtomicU32::new(5); + let config = test_config(3); + + let result = retry_query(&config, "test", || async { + let remaining = fail_count.load(Ordering::SeqCst); + if remaining > 0 { + fail_count.fetch_sub(1, Ordering::SeqCst); + Err(retryable_error()) + } else { + Ok(42u64) + } + }) + .await; + + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_non_retryable_error_not_retried() { + let call_count = AtomicU32::new(0); + let config = test_config(3); + + let result = retry_query(&config, "test", || async { + call_count.fetch_add(1, Ordering::SeqCst); + Err::(non_retryable_error()) + }) + .await; + + assert!(result.is_err()); + // Called only once — no retries for non-retryable errors + assert_eq!(call_count.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn test_zero_retries_surfaces_immediately() { + let config = test_config(0); + + let result = retry_query(&config, "test", || async { + Err::(retryable_error()) + }) + .await; + + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_success_on_first_attempt_no_retry() { + let config = test_config(3); + + let result = retry_query(&config, "test", || async { Ok(1u64) }).await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 1); + } + + #[test] + fn test_is_retryable_error() { + // Retryable: transient connection/pool errors + assert!(is_retryable_error(&Error::from(sqlx::Error::PoolTimedOut))); + assert!(is_retryable_error(&Error::from(sqlx::Error::PoolClosed))); + assert!(is_retryable_error(&Error::from(sqlx::Error::WorkerCrashed))); + assert!(is_retryable_error(&Error::from(sqlx::Error::Io( + std::io::Error::new(std::io::ErrorKind::ConnectionReset, "connection reset") + )))); + + // Not retryable: logic/schema errors + assert!(!is_retryable_error(&Error::from(sqlx::Error::RowNotFound))); + assert!(!is_retryable_error(&Error::from( + sqlx::Error::ColumnNotFound("id".into()) + ))); + assert!(!is_retryable_error(&Error::from(sqlx::Error::Protocol( + "unexpected".into() + )))); + } + + #[test] + fn test_config_defaults_to_zero_retries() { + let config = RetryConfig::from_config(&Arc::new(Config::default())); + assert_eq!(config.max_retries, 0); + } + + #[test] + fn test_config_uses_configured_retries() { + let config = RetryConfig::from_config(&Arc::new(Config { + db_query_max_retries: Some(5), + ..Config::default() + })); + assert_eq!(config.max_retries, 5); + } +} diff --git a/src/store/tests.rs b/src/store/tests.rs index 18df83aa..452dddb1 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -1928,6 +1928,10 @@ async fn test_db_status_calls_ok() { claim_lease_ms: 5000, vacuum_page_count: None, enable_sqlite_status_metrics: false, + retry_config: crate::store::retry::RetryConfig { + max_retries: 0, + retry_delay: std::time::Duration::from_millis(0), + }, }, ) .await