Skip to content

feat(taskbroker): Batch Claimed → Processing Updates#637

Open
george-sentry wants to merge 5 commits into
mainfrom
george/push-taskbroker/batch-dispatch-updates
Open

feat(taskbroker): Batch Claimed → Processing Updates#637
george-sentry wants to merge 5 commits into
mainfrom
george/push-taskbroker/batch-dispatch-updates

Conversation

@george-sentry
Copy link
Copy Markdown
Member

@george-sentry george-sentry commented May 14, 2026

Linear

Completes STREAM-920

Description

Builds on #618, which introduced a generic "flusher" mechanism and configurable batched status updates (that is, status updates triggered by calls to the SetTaskStatus RPC endpoint).

Here, we use the same mechanism to batch claimed → processing updates, which I call "push" updates for short (hence names like push_update_batch_size, push_update_tx, and so on). It can be turned on and off using the batch_push_updates boolean flag.

Batching these updates should result in significantly higher throughput (several thousand tasks per second higher, depending on the workload and database).

Concerning data loss, what if taskbroker crashes while claimed → processing updates are queued? Those updates are lost, meaning even though those tasks were claimed and sent successfully, they weren't updated to processing. Three things can happen here...

  1. If the worker completes a task fast enough, it will be updated to "completed" directly from "claimed" and later removed by upkeep ✅
  2. If the worker doesn't complete a task fast enough, it may be reset back to "pending" before the worker completes it. In that case, it will be updated from "pending" directly to "completed" and later removed by upkeep ✅
  3. If the worker doesn't complete a task fast enough and the taskbroker has enough time to reclaim and / or resend it again, the task will be completed twice ✅ ✅

In all cases, tasks aren't dropped but may be repeated.

@george-sentry george-sentry requested a review from a team as a code owner May 14, 2026 23:16
@linear-code
Copy link
Copy Markdown

linear-code Bot commented May 14, 2026

STREAM-920

Comment thread src/push/mod.rs
Comment thread src/fetch/mod.rs
debug!("Fetching next batch of pending activations...");
metrics::counter!("fetch.loop.count").increment(1);

let start = Instant::now();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting the timer here results in more accurate metrics. It was already like this before but I unintentionally moved it in #618.

Comment thread src/fetch/tests.rs
}

async fn mark_activation_processing(&self, _id: &str) -> Result<(), Error> {
async fn mark_processing(&self, _id: &str) -> Result<(), Error> {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor naming change to match mark_completed and shorten things.

Comment thread src/push/mod.rs
// Are we batching claimed → processing updates?
if let Some(ref tx) = update_tx {
let result = tx.send(id.clone()).await;
metrics::histogram!("push.mark_processing.duration").record(start.elapsed());
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though we aren't calling mark_processing here, I think it would be nice to emit the same metric as when we do call that method. It will allow for easier comparison between batched and single updates.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, that naming is intentional.

Comment thread src/config.rs
pub push_update_batch_size: usize,

/// Maximum milliseconds to wait before flushing a batch of dispatch updates.
pub push_update_interval_ms: u64,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as all this is done, we should probably reorganize the configuration. It has become enormous 😅

Comment thread src/push/mod.rs
Comment thread src/flusher.rs
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit b2b6964. Configure here.

Comment thread src/grpc/server.rs Outdated
Comment thread src/push/mod.rs Outdated
@george-sentry george-sentry force-pushed the george/push-taskbroker/batch-dispatch-updates branch from b2b6964 to 9c87519 Compare May 15, 2026 01:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant