feat(taskbroker): Batch Claimed → Processing Updates#637
Conversation
| debug!("Fetching next batch of pending activations..."); | ||
| metrics::counter!("fetch.loop.count").increment(1); | ||
|
|
||
| let start = Instant::now(); |
There was a problem hiding this comment.
Starting the timer here results in more accurate metrics. It was already like this before but I unintentionally moved it in #618.
| } | ||
|
|
||
| async fn mark_activation_processing(&self, _id: &str) -> Result<(), Error> { | ||
| async fn mark_processing(&self, _id: &str) -> Result<(), Error> { |
There was a problem hiding this comment.
Minor naming change to match mark_completed and shorten things.
| // Are we batching claimed → processing updates? | ||
| if let Some(ref tx) = update_tx { | ||
| let result = tx.send(id.clone()).await; | ||
| metrics::histogram!("push.mark_processing.duration").record(start.elapsed()); |
There was a problem hiding this comment.
Even though we aren't calling mark_processing here, I think it would be nice to emit the same metric as when we do call that method. It will allow for easier comparison between batched and single updates.
There was a problem hiding this comment.
In other words, that naming is intentional.
| pub push_update_batch_size: usize, | ||
|
|
||
| /// Maximum milliseconds to wait before flushing a batch of dispatch updates. | ||
| pub push_update_interval_ms: u64, |
There was a problem hiding this comment.
As soon as all this is done, we should probably reorganize the configuration. It has become enormous 😅
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit b2b6964. Configure here.
b2b6964 to
9c87519
Compare

Linear
Completes STREAM-920
Description
Builds on #618, which introduced a generic "flusher" mechanism and configurable batched status updates (that is, status updates triggered by calls to the
SetTaskStatusRPC endpoint).Here, we use the same mechanism to batch claimed → processing updates, which I call "push" updates for short (hence names like
push_update_batch_size,push_update_tx, and so on). It can be turned on and off using thebatch_push_updatesboolean flag.Batching these updates should result in significantly higher throughput (several thousand tasks per second higher, depending on the workload and database).
Concerning data loss, what if taskbroker crashes while claimed → processing updates are queued? Those updates are lost, meaning even though those tasks were claimed and sent successfully, they weren't updated to processing. Three things can happen here...
In all cases, tasks aren't dropped but may be repeated.