Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ sentry = { version = "0.41.0", default-features = false, features = [
"tracing",
"logs"
] }
sentry_protos = "0.8.13"
sentry_protos = { git = "https://github.com/getsentry/sentry-protos", branch = "feat/taskbroker-max-retries" }
serde = "1.0.214"
serde_bytes = "0.11"
serde_yaml = "0.9.34"
Expand Down
1 change: 1 addition & 0 deletions clients/python/src/taskbroker_client/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,4 @@ class ProcessingResult:
status: TaskActivationStatus.ValueType
host: str
receive_timestamp: float
max_retries: int | None = None
2 changes: 2 additions & 0 deletions clients/python/src/taskbroker_client/worker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ def update_task(
id=processing_result.task_id,
status=processing_result.status,
fetch_next_task=fetch_next_task,
max_retries=processing_result.max_retries, # type: ignore[call-arg]
)

try:
Expand Down Expand Up @@ -566,6 +567,7 @@ def update_task(
id=processing_result.task_id,
status=processing_result.status,
fetch_next_task=None,
max_retries=processing_result.max_retries, # type: ignore[call-arg]
)

retries = 0
Expand Down
4 changes: 4 additions & 0 deletions clients/python/src/taskbroker_client/worker/workerchild.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
set_current_task(inflight.activation)

next_state = TASK_ACTIVATION_STATUS_FAILURE
max_retries_val: int | None = None
# Use time.time() so we can measure against activation.received_at
execution_start_time = time.time()
try:
Expand Down Expand Up @@ -261,6 +262,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
retry = task_func.retry
if retry and retry.should_retry(inflight.activation.retry_state, err):
next_state = TASK_ACTIVATION_STATUS_RETRY
max_retries_val = retry._times
else:
next_state = TASK_ACTIVATION_STATUS_FAILURE
except Exception as err:
Expand All @@ -279,6 +281,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
},
)
next_state = TASK_ACTIVATION_STATUS_RETRY
max_retries_val = retry._times
elif retry.max_attempts_reached(inflight.activation.retry_state):
with sentry_sdk.isolation_scope() as scope:
if should_capture_error:
Expand Down Expand Up @@ -321,6 +324,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
status=next_state,
host=inflight.host,
receive_timestamp=inflight.receive_timestamp,
max_retries=max_retries_val,
)
)

Expand Down
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ pub struct Config {
/// The location to the DLQ private key file
pub kafka_deadletter_ssl_key_location: Option<String>,

/// The topic to publish retry task activations to.
/// When set, retries go to this topic instead of kafka_topic.
/// Required for raw_mode where the main topic has other consumers.
pub kafka_retry_topic: Option<String>,

/// The default number of partitions for a topic
pub default_topic_partitions: i32,

Expand Down Expand Up @@ -362,6 +367,7 @@ impl Default for Config {
kafka_deadletter_ssl_ca_location: None,
kafka_deadletter_ssl_certificate_location: None,
kafka_deadletter_ssl_key_location: None,
kafka_retry_topic: None,
default_topic_partitions: 1,
kafka_session_timeout_ms: 6000,
kafka_auto_commit_interval_ms: 5000,
Expand Down
4 changes: 4 additions & 0 deletions src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ impl InflightActivationStore for MockStore {
unimplemented!()
}

async fn update_retry_state(&self, _id: &str, _max_retries: u32) -> Result<(), Error> {
unimplemented!()
}

async fn set_processing_deadline(
&self,
_id: &str,
Expand Down
10 changes: 10 additions & 0 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ impl ConsumerService for TaskbrokerServer {
metrics::counter!("grpc_server.set_status.failure").increment(1);
}

// If status is Retry and max_retries is provided, update the activation's retry_state.
// This allows workers to communicate retry policy for tasks from raw topics.
if status == InflightActivationStatus::Retry {
if let Some(max_retries) = request.get_ref().max_retries {
if let Err(e) = self.store.update_retry_state(&id, max_retries).await {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often will this happen? If it happens on every request, throughput will drop significantly. If it happens only occasionally, it should be fine.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on every retry, we need to update the task activation. but i think i'll redesign this so that the task activation is updated in one go (max_retries and retry_count are updated using the same update stmt)

error!(?id, ?max_retries, "Failed to update retry state: {:?}", e);
}
}
}

match self.store.set_status(&id, status).await {
Ok(Some(_)) => metrics::counter!(
"grpc_server.set_status",
Expand Down
6 changes: 6 additions & 0 deletions src/grpc/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ async fn test_set_task_status(#[case] adapter: &str) {
id: "test_task".to_string(),
status: 5, // Complete
fetch_next_task: None,
max_retries: None,
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_ok());
Expand All @@ -89,6 +90,7 @@ async fn test_set_task_status_invalid(#[case] adapter: &str) {
id: "test_task".to_string(),
status: 1, // Invalid
fetch_next_task: None,
max_retries: None,
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_err());
Expand Down Expand Up @@ -221,6 +223,7 @@ async fn test_set_task_status_success(#[case] adapter: &str) {
namespace: None,
application: None,
}),
max_retries: None,
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_ok());
Expand Down Expand Up @@ -256,6 +259,7 @@ async fn test_set_task_status_with_application(#[case] adapter: &str) {
application: Some("hammers".into()),
namespace: None,
}),
max_retries: None,
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_ok());
Expand Down Expand Up @@ -296,6 +300,7 @@ async fn test_set_task_status_with_application_no_match(#[case] adapter: &str) {
application: Some("no-matches".into()),
namespace: None,
}),
max_retries: None,
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_ok());
Expand Down Expand Up @@ -324,6 +329,7 @@ async fn test_set_task_status_with_namespace_requires_application(#[case] adapte
application: None,
namespace: Some(namespace),
}),
max_retries: None,
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_ok());
Expand Down
14 changes: 14 additions & 0 deletions src/kafka/deserialize.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use anyhow::Error;
use rdkafka::Message;
use rdkafka::message::OwnedMessage;

use crate::config::Config;
Expand All @@ -12,27 +13,40 @@ use super::deserialize_raw::{self, RawConfig};
pub struct DeserializeConfig {
activation_config: DeserializeActivationConfig,
raw_config: Option<RawConfig>,
/// Retry topic always contains activations, even in raw_mode.
retry_topic: Option<String>,
}

impl DeserializeConfig {
pub fn from_config(config: &Config) -> Self {
Self {
activation_config: DeserializeActivationConfig::from_config(config),
raw_config: RawConfig::from_config(config),
retry_topic: config.kafka_retry_topic.clone(),
}
}
}

/// Create a unified deserializer that handles both normal and raw modes.
/// In raw mode, raw Kafka bytes are wrapped into a TaskActivation.
/// In normal mode, Kafka messages are expected to contain encoded TaskActivation protos.
/// Messages from the retry topic are always deserialized as activations.
pub fn new(
config: DeserializeConfig,
) -> impl Fn(Arc<OwnedMessage>) -> Result<InflightActivation, Error> {
let raw_deserializer = config.raw_config.map(deserialize_raw::new);
let activation_deserializer = deserialize_activation::new(config.activation_config);
let retry_topic = config.retry_topic;

move |msg: Arc<OwnedMessage>| {
// Messages from the retry topic are always activations
if let Some(ref retry_topic) = retry_topic {
if msg.topic() == retry_topic {
return activation_deserializer(msg);
}
}

// For main topic: use raw deserializer in raw_mode, else activation deserializer
if let Some(ref raw_deserializer) = raw_deserializer {
raw_deserializer(msg)
} else {
Expand Down
22 changes: 20 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,21 @@ async fn main() -> Result<(), Error> {
if config.create_missing_topics {
let kafka_client_config = config.kafka_consumer_config();
create_missing_topics(
kafka_client_config,
kafka_client_config.clone(),
&config.kafka_topic,
config.default_topic_partitions,
)
.await?;

// Create retry topic if configured
if let Some(ref retry_topic) = config.kafka_retry_topic {
create_missing_topics(
kafka_client_config,
retry_topic,
config.default_topic_partitions,
)
.await?;
}
}

if config.full_vacuum_on_start {
Expand Down Expand Up @@ -158,11 +168,19 @@ async fn main() -> Result<(), Error> {
let consumer_store = store.clone();
let consumer_config = config.clone();
let runtime_config_manager = runtime_config_manager.clone();

// Build list of topics to consume from
let mut topics_to_consume = vec![consumer_config.kafka_topic.clone()];
if let Some(ref retry_topic) = consumer_config.kafka_retry_topic {
topics_to_consume.push(retry_topic.clone());
}

async move {
// The consumer has an internal thread that listens for cancellations, so it doesn't need
// an outer select here like the other tasks.
let topic_refs: Vec<&str> = topics_to_consume.iter().map(|s| s.as_str()).collect();
start_consumer(
&[&consumer_config.kafka_topic],
&topic_refs,
&consumer_config.kafka_consumer_config(),
consumer_store.clone(),
processing_strategy!({
Expand Down
3 changes: 3 additions & 0 deletions src/push/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ impl InflightActivationStore for MockStore {
) -> anyhow::Result<Option<InflightActivation>> {
Ok(None)
}
async fn update_retry_state(&self, _id: &str, _max_retries: u32) -> anyhow::Result<()> {
Ok(())
}
async fn pending_activation_max_lag(&self, _now: &DateTime<Utc>) -> f64 {
0.0
}
Expand Down
44 changes: 43 additions & 1 deletion src/store/adapters/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use anyhow::{Error, anyhow};
use async_backtrace::framed;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use sentry_protos::taskbroker::v1::OnAttemptsExceeded;
use prost::Message;
use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivation};
use tracing::{instrument, warn};

use crate::config::Config;
Expand Down Expand Up @@ -654,6 +655,47 @@ impl InflightActivationStore for PostgresActivationStore {
Ok(Some(row.into()))
}

#[instrument(skip_all)]
#[framed]
async fn update_retry_state(&self, id: &str, max_retries: u32) -> Result<(), Error> {
let mut conn = self.acquire_write_conn_metric("update_retry_state").await?;

// Fetch the current activation
let row: Option<TableRow> = sqlx::query_as(
"SELECT *, kafka_offset AS offset FROM inflight_taskactivations WHERE id = $1",
)
.bind(id)
.fetch_optional(&mut *conn)
.await?;

let Some(row) = row else {
return Ok(());
};

// Decode the activation, update retry_state, re-encode
let mut activation = TaskActivation::decode(&row.activation as &[u8])?;

let retry_state = activation.retry_state.get_or_insert_with(|| RetryState {
attempts: 0,
max_attempts: 0,
on_attempts_exceeded: OnAttemptsExceeded::Discard.into(),
delay_on_retry: None,
at_most_once: Some(false),
});
// max_retries excludes initial attempt, max_attempts includes it
retry_state.max_attempts = max_retries + 1;

let updated_bytes = activation.encode_to_vec();

sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2")
.bind(updated_bytes)
.bind(id)
.execute(&mut *conn)
.await?;

Ok(())
}

#[instrument(skip_all)]
#[framed]
async fn set_processing_deadline(
Expand Down
42 changes: 41 additions & 1 deletion src/store/adapters/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use libsqlite3_sys::{
SQLITE_DBSTATUS_LOOKASIDE_USED, SQLITE_DBSTATUS_SCHEMA_USED, SQLITE_DBSTATUS_STMT_USED,
SQLITE_OK, sqlite3_db_status,
};
use sentry_protos::taskbroker::v1::OnAttemptsExceeded;
use prost::Message;
use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivation};
use tracing::{instrument, warn};

use crate::config::Config;
Expand Down Expand Up @@ -706,6 +707,45 @@ impl InflightActivationStore for SqliteActivationStore {
Ok(Some(row.into()))
}

#[instrument(skip_all)]
async fn update_retry_state(&self, id: &str, max_retries: u32) -> Result<(), Error> {
let mut conn = self.acquire_write_conn_metric("update_retry_state").await?;

// Fetch the current activation
let row: Option<TableRow> =
sqlx::query_as("SELECT * FROM inflight_taskactivations WHERE id = $1")
.bind(id)
.fetch_optional(&mut *conn)
.await?;

let Some(row) = row else {
return Ok(());
};

// Decode the activation, update retry_state, re-encode
let mut activation = TaskActivation::decode(&row.activation as &[u8])?;

let retry_state = activation.retry_state.get_or_insert_with(|| RetryState {
attempts: 0,
max_attempts: 0,
on_attempts_exceeded: OnAttemptsExceeded::Discard.into(),
delay_on_retry: None,
at_most_once: Some(false),
});
// max_retries excludes initial attempt, max_attempts includes it
retry_state.max_attempts = max_retries + 1;

let updated_bytes = activation.encode_to_vec();

sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2")
.bind(updated_bytes)
.bind(id)
.execute(&mut *conn)
.await?;

Ok(())
}

#[instrument(skip_all)]
async fn set_processing_deadline(
&self,
Expand Down
Loading
Loading