Skip to content

feat(taskbroker): Batch Status Updates#618

Merged
george-sentry merged 15 commits into
mainfrom
george/push-taskbroker/batch-updates
May 14, 2026
Merged

feat(taskbroker): Batch Status Updates#618
george-sentry merged 15 commits into
mainfrom
george/push-taskbroker/batch-updates

Conversation

@george-sentry
Copy link
Copy Markdown
Member

@george-sentry george-sentry commented Apr 30, 2026

Linear

Completes STREAM-918

Description

On the usual workload of 100 millisecond tasks, with the new "claimed" status, we can do around 5K tasks per second in the sandbox. By batching status updates, we reduce DB load, making all queries take less time. This can increase throughput by 1K to 2K tasks per second.

@george-sentry george-sentry requested a review from a team as a code owner April 30, 2026 21:02
@linear-code
Copy link
Copy Markdown

linear-code Bot commented Apr 30, 2026

Comment thread src/main.rs Outdated
Comment thread src/main.rs Outdated
Comment thread src/main.rs Outdated
Comment thread src/main.rs Outdated
Comment thread src/main.rs Outdated
@george-sentry george-sentry marked this pull request as draft April 30, 2026 23:24
@george-sentry
Copy link
Copy Markdown
Member Author

Since we may want to treat claimed → processing updates the same way, I'm actually going to create a more general Flusher struct that can be used by both push threads and the gRPC server.

@george-sentry george-sentry marked this pull request as ready for review May 1, 2026 08:02
Comment thread src/grpc/server.rs
Comment thread src/store/adapters/sqlite.rs Outdated
Comment thread src/flusher.rs
/// Run flusher that receives values of type T from a channel and flushes
/// them using the provided async `flush` function either when the batch is
/// full or when the max flush interval has elapsed.
pub async fn run_flusher<T, F>(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this function because I'm also planning to batch claimed → processing updates in the push pool, which will use basically identical machinery.

Comment thread src/grpc/server.rs
Comment thread src/grpc/status_flusher.rs Outdated
Comment thread src/store/adapters/sqlite.rs Outdated
Comment thread src/store/adapters/sqlite.rs Outdated
Comment thread src/store/adapters/postgres.rs Outdated
Comment thread src/store/adapters/postgres.rs Outdated
@george-sentry george-sentry changed the title feat(taskbroker): Batch Status Updates and Delete Completed Tasks Immediately feat(taskbroker): Batch Status Updates May 1, 2026
Comment thread src/grpc/status_flusher.rs Outdated
}
}

_ = interval.tick() => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this trigger ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code now lives in flusher.rs, but it contains a similar loop. This condition triggers every interval_ms after the previous tick.

The flusher only handles the tick when select! actually chooses this branch. If messages keep arriving and the rx.recv() arm keeps winning before the tick is ready, the tick still advances in the background. When the tick is ready and this arm is selected, the buffer is flushed.

Comment thread src/grpc/status_flusher.rs Outdated
Comment on lines +124 to +126
for id in ids {
buffer.push((id, status));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say there is a DB issue, would we keep appending to the buffer indefinitely? I think we should add a limit after which we stop and retry on the DB.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually dead code, but similar logic now lives elsewhere.

No, we only append to the buffer while it hasn't reached the desired batch size. So if there's a DB issue, here's what should happen.

  1. Timer runs out or buffer fills up → call flush (this function)
  2. As long as flush is running, the (now empty) buffer does not receive any more IDs
  3. Flush fails because store is unresponsive or some other problem
  4. IDs are pushed back onto the buffer (which was emptied right before attempting the flush)
  5. Flush function exits

So if the DB has a problem, we will keep retrying the same batch of IDs over and over again until it succeeds.

Comment thread src/grpc/server.rs Outdated
Comment thread src/grpc/server_tests.rs
Comment thread src/flusher.rs Outdated
Comment thread src/main.rs Outdated
Comment thread src/main.rs
Comment thread src/main.rs Outdated
Comment thread src/grpc/server.rs
Comment thread src/fetch/tests.rs Outdated
Comment thread src/grpc/server.rs Outdated
george-sentry and others added 2 commits May 11, 2026 09:15
Co-authored-by: Markus Unterwaditzer <markus-github@unterwaditzer.net>
Comment thread src/grpc/server.rs
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 25dce9e. Configure here.

Comment thread src/main.rs Outdated
Copy link
Copy Markdown
Member

@markstory markstory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. I didn't see any paths that would lead to data loss. The additional queue could result in writes that were accepted from the client perspective to be lost during a crash. While we'll run the task an additional time, we shouldn't lose any data, as the activations will still be in postgres.

Comment thread src/flusher.rs Outdated
@george-sentry
Copy link
Copy Markdown
Member Author

george-sentry commented May 14, 2026

The additional queue could result in writes that were accepted from the client perspective to be lost during a crash. While we'll run the task an additional time, we shouldn't lose any data, as the activations will still be in Postgres.

True. I also thought about it this way.

If I batch updates, set_task_status will run faster so the taskworker's internal result queue will drain quicker. If the taskbroker crashes, the queued updates will be lost.

If I don't batch updates, set_task_status will run slower so the taskworker's internal result queue will drain slower. If the taskworker crashes, the queued updates will be lost.

Regardless, the outcome is roughly the same -- a certain number tasks are lost and executed a second time. The only difference is whether they are queued in the taskbroker or the taskworker.

@george-sentry george-sentry merged commit 4567466 into main May 14, 2026
24 checks passed
@george-sentry george-sentry deleted the george/push-taskbroker/batch-updates branch May 14, 2026 22:01
Comment thread src/fetch/mod.rs
}

_ = async {
let start = Instant::now();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. I'm moving it back to where it was in #637.

Comment thread src/grpc/server.rs
Comment on lines +281 to +283
for id in ids {
buffer.push((id, status));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say the request fails because the DB has a problem (not a transient one) or it is saturated.
Are we going to pile up requests till the broker goes out of memory? This would leave the database in an inconsistent state that will be very hard to troubleshoot.

Do we go through this code only when the worker updates the status or also when we claim the task or reset them during the upkeep ? Upkeep and claim phase have an easier and more deterministic way to batch requests, so they should not go through this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that'll happen because the buffer is bounded, meaning requests won't pile up. It's not obvious here because the logic that limits buffer size is in flusher.rs.

When I simulated various database issues during testing (such as high latency and connection reset errors), this did not appear to be a problem. Taskbroker was able to recover on its own as soon as those issues were resolved.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it may be worth testing again because that was a while ago.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction... there was a bug in the buffering logic in flusher.rs that would result in unbounded growth. I'm fixing it in #637.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants