From e0c6fcc62c6cc6a0bde58649191b1588652d3f1b Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 11 May 2026 16:01:57 +0200 Subject: [PATCH 1/2] feat(taskbroker): Implement retry support for raw topics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add retry support for raw/passthrough topics (e.g. `ingest-events`) where tasks don't have retry_state embedded in the message. Changes: - Config: Add `kafka_retry_topic` option for dedicated retry topic - Store: Add `update_retry_state` method to update activation's retry_state - gRPC: Handle `max_retries` in SetTaskStatus, call store.update_retry_state - Upkeep: Route retries to dedicated retry topic when configured - Consumer: Subscribe to both main and retry topics - Deserializer: Topic-aware routing (retry topic always uses activation deserializer) - Python client: Extract max_retries from Retry config, send in SetTaskStatusRequest When a worker reports RETRY status with max_retries, the broker updates the activation's retry_state and routes the retry to the dedicated retry topic. This prevents retries from polluting the main topic where other consumers (like SBC) can't parse activations. See https://www.notion.so/3448b10e4b5d80e7a1efee6145d504c2 → Stage 4 Depends on: https://github.com/getsentry/sentry-protos/pull/251 ref STREAM-981 Co-Authored-By: Claude Opus 4.5 --- Cargo.lock | 7 ++- Cargo.toml | 2 +- clients/python/src/taskbroker_client/types.py | 1 + .../src/taskbroker_client/worker/client.py | 2 + .../taskbroker_client/worker/workerchild.py | 4 ++ src/config.rs | 6 +++ src/fetch/tests.rs | 4 ++ src/grpc/server.rs | 10 +++++ src/grpc/server_tests.rs | 6 +++ src/kafka/deserialize.rs | 14 ++++++ src/main.rs | 22 +++++++++- src/push/tests.rs | 3 ++ src/store/adapters/postgres.rs | 43 ++++++++++++++++++- src/store/adapters/sqlite.rs | 41 +++++++++++++++++- src/store/traits.rs | 4 ++ src/upkeep.rs | 10 ++++- 16 files changed, 168 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 387eb531..0f5dd020 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1459,7 +1459,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -2677,9 +2677,8 @@ dependencies = [ [[package]] name = "sentry_protos" -version = "0.8.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60dfb8c1b03c3f6e800a91eca7daea05205dd87f63b8d70b50b7e2211a2e0be2" +version = "0.8.29" +source = "git+https://github.com/getsentry/sentry-protos?branch=feat%2Ftaskbroker-max-retries#fb9fadfa88bf6ea3eff26b7d5b068b4e6c3869f1" dependencies = [ "prost", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index 6e618f47..4ad45bfb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ sentry = { version = "0.41.0", default-features = false, features = [ "tracing", "logs" ] } -sentry_protos = "0.8.13" +sentry_protos = { git = "https://github.com/getsentry/sentry-protos", branch = "feat/taskbroker-max-retries" } serde = "1.0.214" serde_bytes = "0.11" serde_yaml = "0.9.34" diff --git a/clients/python/src/taskbroker_client/types.py b/clients/python/src/taskbroker_client/types.py index 326ddd96..c0acdba4 100644 --- a/clients/python/src/taskbroker_client/types.py +++ b/clients/python/src/taskbroker_client/types.py @@ -68,3 +68,4 @@ class ProcessingResult: status: TaskActivationStatus.ValueType host: str receive_timestamp: float + max_retries: int | None = None diff --git a/clients/python/src/taskbroker_client/worker/client.py b/clients/python/src/taskbroker_client/worker/client.py index 2a1c56b2..125c31cd 100644 --- a/clients/python/src/taskbroker_client/worker/client.py +++ b/clients/python/src/taskbroker_client/worker/client.py @@ -445,6 +445,7 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=fetch_next_task, + max_retries=processing_result.max_retries, # type: ignore[call-arg] ) try: @@ -566,6 +567,7 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=None, + max_retries=processing_result.max_retries, # type: ignore[call-arg] ) retries = 0 diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index fa2e8476..72e77a7e 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -234,6 +234,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: set_current_task(inflight.activation) next_state = TASK_ACTIVATION_STATUS_FAILURE + max_retries_val: int | None = None # Use time.time() so we can measure against activation.received_at execution_start_time = time.time() try: @@ -261,6 +262,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: retry = task_func.retry if retry and retry.should_retry(inflight.activation.retry_state, err): next_state = TASK_ACTIVATION_STATUS_RETRY + max_retries_val = retry._times else: next_state = TASK_ACTIVATION_STATUS_FAILURE except Exception as err: @@ -279,6 +281,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: }, ) next_state = TASK_ACTIVATION_STATUS_RETRY + max_retries_val = retry._times elif retry.max_attempts_reached(inflight.activation.retry_state): with sentry_sdk.isolation_scope() as scope: if should_capture_error: @@ -321,6 +324,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: status=next_state, host=inflight.host, receive_timestamp=inflight.receive_timestamp, + max_retries=max_retries_val, ) ) diff --git a/src/config.rs b/src/config.rs index 700f1a5a..f15ace99 100644 --- a/src/config.rs +++ b/src/config.rs @@ -129,6 +129,11 @@ pub struct Config { /// The location to the DLQ private key file pub kafka_deadletter_ssl_key_location: Option, + /// The topic to publish retry task activations to. + /// When set, retries go to this topic instead of kafka_topic. + /// Required for raw_mode where the main topic has other consumers. + pub kafka_retry_topic: Option, + /// The default number of partitions for a topic pub default_topic_partitions: i32, @@ -362,6 +367,7 @@ impl Default for Config { kafka_deadletter_ssl_ca_location: None, kafka_deadletter_ssl_certificate_location: None, kafka_deadletter_ssl_key_location: None, + kafka_retry_topic: None, default_topic_partitions: 1, kafka_session_timeout_ms: 6000, kafka_auto_commit_interval_ms: 5000, diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index 0d11da63..6260cba7 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -122,6 +122,10 @@ impl InflightActivationStore for MockStore { unimplemented!() } + async fn update_retry_state(&self, _id: &str, _max_retries: u32) -> Result<(), Error> { + unimplemented!() + } + async fn set_processing_deadline( &self, _id: &str, diff --git a/src/grpc/server.rs b/src/grpc/server.rs index cf4a6bdd..379f5577 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -101,6 +101,16 @@ impl ConsumerService for TaskbrokerServer { metrics::counter!("grpc_server.set_status.failure").increment(1); } + // If status is Retry and max_retries is provided, update the activation's retry_state. + // This allows workers to communicate retry policy for tasks from raw topics. + if status == InflightActivationStatus::Retry { + if let Some(max_retries) = request.get_ref().max_retries { + if let Err(e) = self.store.update_retry_state(&id, max_retries).await { + error!(?id, ?max_retries, "Failed to update retry state: {:?}", e); + } + } + } + match self.store.set_status(&id, status).await { Ok(Some(_)) => metrics::counter!( "grpc_server.set_status", diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index 2b986d66..4098a3f4 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -68,6 +68,7 @@ async fn test_set_task_status(#[case] adapter: &str) { id: "test_task".to_string(), status: 5, // Complete fetch_next_task: None, + max_retries: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -89,6 +90,7 @@ async fn test_set_task_status_invalid(#[case] adapter: &str) { id: "test_task".to_string(), status: 1, // Invalid fetch_next_task: None, + max_retries: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_err()); @@ -221,6 +223,7 @@ async fn test_set_task_status_success(#[case] adapter: &str) { namespace: None, application: None, }), + max_retries: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -256,6 +259,7 @@ async fn test_set_task_status_with_application(#[case] adapter: &str) { application: Some("hammers".into()), namespace: None, }), + max_retries: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -296,6 +300,7 @@ async fn test_set_task_status_with_application_no_match(#[case] adapter: &str) { application: Some("no-matches".into()), namespace: None, }), + max_retries: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -324,6 +329,7 @@ async fn test_set_task_status_with_namespace_requires_application(#[case] adapte application: None, namespace: Some(namespace), }), + max_retries: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); diff --git a/src/kafka/deserialize.rs b/src/kafka/deserialize.rs index 0a6d0919..3679b1fd 100644 --- a/src/kafka/deserialize.rs +++ b/src/kafka/deserialize.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::Error; +use rdkafka::Message; use rdkafka::message::OwnedMessage; use crate::config::Config; @@ -12,6 +13,8 @@ use super::deserialize_raw::{self, RawConfig}; pub struct DeserializeConfig { activation_config: DeserializeActivationConfig, raw_config: Option, + /// Retry topic always contains activations, even in raw_mode. + retry_topic: Option, } impl DeserializeConfig { @@ -19,6 +22,7 @@ impl DeserializeConfig { Self { activation_config: DeserializeActivationConfig::from_config(config), raw_config: RawConfig::from_config(config), + retry_topic: config.kafka_retry_topic.clone(), } } } @@ -26,13 +30,23 @@ impl DeserializeConfig { /// Create a unified deserializer that handles both normal and raw modes. /// In raw mode, raw Kafka bytes are wrapped into a TaskActivation. /// In normal mode, Kafka messages are expected to contain encoded TaskActivation protos. +/// Messages from the retry topic are always deserialized as activations. pub fn new( config: DeserializeConfig, ) -> impl Fn(Arc) -> Result { let raw_deserializer = config.raw_config.map(deserialize_raw::new); let activation_deserializer = deserialize_activation::new(config.activation_config); + let retry_topic = config.retry_topic; move |msg: Arc| { + // Messages from the retry topic are always activations + if let Some(ref retry_topic) = retry_topic { + if msg.topic() == retry_topic { + return activation_deserializer(msg); + } + } + + // For main topic: use raw deserializer in raw_mode, else activation deserializer if let Some(ref raw_deserializer) = raw_deserializer { raw_deserializer(msg) } else { diff --git a/src/main.rs b/src/main.rs index c91a766f..f907f726 100644 --- a/src/main.rs +++ b/src/main.rs @@ -86,11 +86,21 @@ async fn main() -> Result<(), Error> { if config.create_missing_topics { let kafka_client_config = config.kafka_consumer_config(); create_missing_topics( - kafka_client_config, + kafka_client_config.clone(), &config.kafka_topic, config.default_topic_partitions, ) .await?; + + // Create retry topic if configured + if let Some(ref retry_topic) = config.kafka_retry_topic { + create_missing_topics( + kafka_client_config, + retry_topic, + config.default_topic_partitions, + ) + .await?; + } } if config.full_vacuum_on_start { @@ -158,11 +168,19 @@ async fn main() -> Result<(), Error> { let consumer_store = store.clone(); let consumer_config = config.clone(); let runtime_config_manager = runtime_config_manager.clone(); + + // Build list of topics to consume from + let mut topics_to_consume = vec![consumer_config.kafka_topic.clone()]; + if let Some(ref retry_topic) = consumer_config.kafka_retry_topic { + topics_to_consume.push(retry_topic.clone()); + } + async move { // The consumer has an internal thread that listens for cancellations, so it doesn't need // an outer select here like the other tasks. + let topic_refs: Vec<&str> = topics_to_consume.iter().map(|s| s.as_str()).collect(); start_consumer( - &[&consumer_config.kafka_topic], + &topic_refs, &consumer_config.kafka_consumer_config(), consumer_store.clone(), processing_strategy!({ diff --git a/src/push/tests.rs b/src/push/tests.rs index ca479a6d..3349bdee 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -103,6 +103,9 @@ impl InflightActivationStore for MockStore { ) -> anyhow::Result> { Ok(None) } + async fn update_retry_state(&self, _id: &str, _max_retries: u32) -> anyhow::Result<()> { + Ok(()) + } async fn pending_activation_max_lag(&self, _now: &DateTime) -> f64 { 0.0 } diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 8f087b59..4590a800 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -11,7 +11,8 @@ use anyhow::{Error, anyhow}; use async_backtrace::framed; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use sentry_protos::taskbroker::v1::OnAttemptsExceeded; +use prost::Message; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivation}; use tracing::{instrument, warn}; use crate::config::Config; @@ -654,6 +655,46 @@ impl InflightActivationStore for PostgresActivationStore { Ok(Some(row.into())) } + #[instrument(skip_all)] + #[framed] + async fn update_retry_state(&self, id: &str, max_retries: u32) -> Result<(), Error> { + let mut conn = self.acquire_write_conn_metric("update_retry_state").await?; + + // Fetch the current activation + let row: Option = sqlx::query_as( + "SELECT *, kafka_offset AS offset FROM inflight_taskactivations WHERE id = $1", + ) + .bind(id) + .fetch_optional(&mut *conn) + .await?; + + let Some(row) = row else { + return Ok(()); + }; + + // Decode the activation, update retry_state, re-encode + let mut activation = TaskActivation::decode(&row.activation as &[u8])?; + + let retry_state = activation.retry_state.get_or_insert_with(|| RetryState { + attempts: 0, + max_attempts: 0, + on_attempts_exceeded: OnAttemptsExceeded::Discard.into(), + delay_on_retry: None, + at_most_once: Some(false), + }); + retry_state.max_attempts = max_retries; + + let updated_bytes = activation.encode_to_vec(); + + sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2") + .bind(updated_bytes) + .bind(id) + .execute(&mut *conn) + .await?; + + Ok(()) + } + #[instrument(skip_all)] #[framed] async fn set_processing_deadline( diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 8692ac0c..9bce8367 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -20,7 +20,8 @@ use libsqlite3_sys::{ SQLITE_DBSTATUS_LOOKASIDE_USED, SQLITE_DBSTATUS_SCHEMA_USED, SQLITE_DBSTATUS_STMT_USED, SQLITE_OK, sqlite3_db_status, }; -use sentry_protos::taskbroker::v1::OnAttemptsExceeded; +use prost::Message; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivation}; use tracing::{instrument, warn}; use crate::config::Config; @@ -706,6 +707,44 @@ impl InflightActivationStore for SqliteActivationStore { Ok(Some(row.into())) } + #[instrument(skip_all)] + async fn update_retry_state(&self, id: &str, max_retries: u32) -> Result<(), Error> { + let mut conn = self.acquire_write_conn_metric("update_retry_state").await?; + + // Fetch the current activation + let row: Option = + sqlx::query_as("SELECT * FROM inflight_taskactivations WHERE id = $1") + .bind(id) + .fetch_optional(&mut *conn) + .await?; + + let Some(row) = row else { + return Ok(()); + }; + + // Decode the activation, update retry_state, re-encode + let mut activation = TaskActivation::decode(&row.activation as &[u8])?; + + let retry_state = activation.retry_state.get_or_insert_with(|| RetryState { + attempts: 0, + max_attempts: 0, + on_attempts_exceeded: OnAttemptsExceeded::Discard.into(), + delay_on_retry: None, + at_most_once: Some(false), + }); + retry_state.max_attempts = max_retries; + + let updated_bytes = activation.encode_to_vec(); + + sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2") + .bind(updated_bytes) + .bind(id) + .execute(&mut *conn) + .await?; + + Ok(()) + } + #[instrument(skip_all)] async fn set_processing_deadline( &self, diff --git a/src/store/traits.rs b/src/store/traits.rs index c21a9d41..1f4a0e97 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -78,6 +78,10 @@ pub trait InflightActivationStore: Send + Sync { status: InflightActivationStatus, ) -> Result, Error>; + /// Update the retry_state in the activation blob. + /// Called when worker provides max_retries with a Retry status. + async fn update_retry_state(&self, id: &str, max_retries: u32) -> Result<(), Error>; + /// COUNT OPERATIONS /// Get the age of the oldest pending activation in seconds async fn pending_activation_max_lag(&self, now: &DateTime) -> f64; diff --git a/src/upkeep.rs b/src/upkeep.rs index 5336f063..23c8f00d 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -151,20 +151,26 @@ pub async fn do_upkeep( // 1. Handle retry tasks let handle_retries_start = Instant::now(); if let Ok(retries) = store.get_retry_activations().await { + // Use retry topic if configured, otherwise fall back to main topic + let retry_target_topic = config + .kafka_retry_topic + .as_ref() + .unwrap_or(&config.kafka_topic); + // 2. Append retries to kafka let deliveries = retries .into_iter() .map(|inflight| { let producer = producer.clone(); let config = config.clone(); + let target_topic = retry_target_topic.clone(); async move { let activation = TaskActivation::decode(&inflight.activation as &[u8]).unwrap(); let serialized = create_retry_activation(&activation).encode_to_vec(); let delivery = producer .send( - FutureRecord::<(), Vec>::to(&config.kafka_topic) - .payload(&serialized), + FutureRecord::<(), Vec>::to(&target_topic).payload(&serialized), Timeout::After(Duration::from_millis(config.kafka_send_timeout_ms)), ) .await; From 95413d00cb2e8fd88c279cf0116a2fefdd11481a Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 11 May 2026 16:08:01 +0200 Subject: [PATCH 2/2] fix: Convert max_retries to max_attempts correctly max_retries (from Python's @task decorator) excludes the initial attempt, while max_attempts includes it. Add 1 when storing to retry_state. Example: @task(max_retries=3) means 4 total attempts (1 initial + 3 retries) Co-Authored-By: Claude Opus 4.5 --- src/store/adapters/postgres.rs | 3 ++- src/store/adapters/sqlite.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 4590a800..6c39428d 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -682,7 +682,8 @@ impl InflightActivationStore for PostgresActivationStore { delay_on_retry: None, at_most_once: Some(false), }); - retry_state.max_attempts = max_retries; + // max_retries excludes initial attempt, max_attempts includes it + retry_state.max_attempts = max_retries + 1; let updated_bytes = activation.encode_to_vec(); diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 9bce8367..fd50e7f5 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -732,7 +732,8 @@ impl InflightActivationStore for SqliteActivationStore { delay_on_retry: None, at_most_once: Some(false), }); - retry_state.max_attempts = max_retries; + // max_retries excludes initial attempt, max_attempts includes it + retry_state.max_attempts = max_retries + 1; let updated_bytes = activation.encode_to_vec();