feat(taskbroker): Batch Status Updates#618
Conversation
|
Since we may want to treat claimed → processing updates the same way, I'm actually going to create a more general |
…eorge/push-taskbroker/batch-updates
| /// Run flusher that receives values of type T from a channel and flushes | ||
| /// them using the provided async `flush` function either when the batch is | ||
| /// full or when the max flush interval has elapsed. | ||
| pub async fn run_flusher<T, F>( |
There was a problem hiding this comment.
I created this function because I'm also planning to batch claimed → processing updates in the push pool, which will use basically identical machinery.
| } | ||
| } | ||
|
|
||
| _ = interval.tick() => { |
There was a problem hiding this comment.
This code now lives in flusher.rs, but it contains a similar loop. This condition triggers every interval_ms after the previous tick.
The flusher only handles the tick when select! actually chooses this branch. If messages keep arriving and the rx.recv() arm keeps winning before the tick is ready, the tick still advances in the background. When the tick is ready and this arm is selected, the buffer is flushed.
| for id in ids { | ||
| buffer.push((id, status)); | ||
| } |
There was a problem hiding this comment.
Let's say there is a DB issue, would we keep appending to the buffer indefinitely? I think we should add a limit after which we stop and retry on the DB.
There was a problem hiding this comment.
This was actually dead code, but similar logic now lives elsewhere.
No, we only append to the buffer while it hasn't reached the desired batch size. So if there's a DB issue, here's what should happen.
- Timer runs out or buffer fills up → call
flush(this function) - As long as
flushis running, the (now empty) buffer does not receive any more IDs - Flush fails because store is unresponsive or some other problem
- IDs are pushed back onto the buffer (which was emptied right before attempting the flush)
- Flush function exits
So if the DB has a problem, we will keep retrying the same batch of IDs over and over again until it succeeds.
…eorge/push-taskbroker/batch-updates
Co-authored-by: Markus Unterwaditzer <markus-github@unterwaditzer.net>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 25dce9e. Configure here.
markstory
left a comment
There was a problem hiding this comment.
Makes sense to me. I didn't see any paths that would lead to data loss. The additional queue could result in writes that were accepted from the client perspective to be lost during a crash. While we'll run the task an additional time, we shouldn't lose any data, as the activations will still be in postgres.
….com/getsentry/taskbroker into george/push-taskbroker/batch-updates
True. I also thought about it this way. If I batch updates, If I don't batch updates, Regardless, the outcome is roughly the same -- a certain number tasks are lost and executed a second time. The only difference is whether they are queued in the taskbroker or the taskworker. |
| } | ||
|
|
||
| _ = async { | ||
| let start = Instant::now(); |
There was a problem hiding this comment.
Not sure. I'm moving it back to where it was in #637.
| for id in ids { | ||
| buffer.push((id, status)); | ||
| } |
There was a problem hiding this comment.
Let's say the request fails because the DB has a problem (not a transient one) or it is saturated.
Are we going to pile up requests till the broker goes out of memory? This would leave the database in an inconsistent state that will be very hard to troubleshoot.
Do we go through this code only when the worker updates the status or also when we claim the task or reset them during the upkeep ? Upkeep and claim phase have an easier and more deterministic way to batch requests, so they should not go through this.
There was a problem hiding this comment.
I don't think that'll happen because the buffer is bounded, meaning requests won't pile up. It's not obvious here because the logic that limits buffer size is in flusher.rs.
When I simulated various database issues during testing (such as high latency and connection reset errors), this did not appear to be a problem. Taskbroker was able to recover on its own as soon as those issues were resolved.
There was a problem hiding this comment.
But it may be worth testing again because that was a while ago.
There was a problem hiding this comment.
Correction... there was a bug in the buffering logic in flusher.rs that would result in unbounded growth. I'm fixing it in #637.

Linear
Completes STREAM-918
Description
On the usual workload of 100 millisecond tasks, with the new "claimed" status, we can do around 5K tasks per second in the sandbox. By batching status updates, we reduce DB load, making all queries take less time. This can increase throughput by 1K to 2K tasks per second.