-
-
Notifications
You must be signed in to change notification settings - Fork 6
feat(taskbroker): Batch Status Updates #618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e707b7e
b7ef805
a2fab2c
c96955e
1e79bfc
5c60034
f46c1f2
a932f39
2e77b13
25dce9e
37c336c
e719dd3
9bef975
099f363
727b378
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| use std::future::Future; | ||
| use std::pin::Pin; | ||
| use std::time::Duration; | ||
|
|
||
| use anyhow::Result; | ||
| use tokio::sync::mpsc::Receiver; | ||
|
|
||
| /// Run flusher that receives values of type T from a channel and flushes | ||
| /// them using the provided async `flush` function either when the batch is | ||
| /// full or when the max flush interval has elapsed. This function is **not** | ||
| /// responsible for draining the buffer - `flush` does that. | ||
| pub async fn run_flusher<T, F>( | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I created this function because I'm also planning to batch claimed → processing updates in the push pool, which will use basically identical machinery. |
||
| mut rx: Receiver<T>, | ||
| batch_size: usize, | ||
| interval_ms: u64, | ||
| mut flush: F, | ||
| ) -> Result<()> | ||
| where | ||
| F: for<'a> FnMut(&'a mut Vec<T>) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>, | ||
| { | ||
| let batch_size = batch_size.max(1); | ||
| let interval_ms = interval_ms.max(1); | ||
|
|
||
| let period = Duration::from_millis(interval_ms); | ||
| let mut interval = tokio::time::interval(period); | ||
| interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | ||
|
|
||
| let mut buffer: Vec<T> = Vec::with_capacity(batch_size); | ||
|
|
||
| loop { | ||
| tokio::select! { | ||
| msg = rx.recv() => { | ||
| match msg { | ||
| Some(v) => { | ||
| buffer.push(v); | ||
|
|
||
| while buffer.len() < batch_size && let Ok(update) = rx.try_recv() { | ||
| buffer.push(update); | ||
| } | ||
|
|
||
| if buffer.len() >= batch_size { | ||
| flush(&mut buffer).await; | ||
| } | ||
| } | ||
|
|
||
| None => { | ||
| // Channel closed (shutdown), flush remaining and exit | ||
| flush(&mut buffer).await; | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| _ = interval.tick() => { | ||
| if !buffer.is_empty() { | ||
| flush(&mut buffer).await; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| pub mod auth_middleware; | ||
| pub mod metrics_middleware; | ||
| pub mod server; | ||
|
|
||
| #[cfg(test)] | ||
| mod server_tests; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,15 +1,18 @@ | ||
| use std::collections::HashMap; | ||
| use std::sync::Arc; | ||
| use std::time::Instant; | ||
|
|
||
| use anyhow::Result; | ||
| use chrono::Utc; | ||
| use prost::Message; | ||
| use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService; | ||
| use sentry_protos::taskbroker::v1::{ | ||
| FetchNextTask, GetTaskRequest, GetTaskResponse, SetTaskStatusRequest, SetTaskStatusResponse, | ||
| TaskActivation, TaskActivationStatus, | ||
| }; | ||
| use tokio::sync::mpsc::Sender; | ||
| use tonic::{Request, Response, Status}; | ||
| use tracing::{error, instrument, warn}; | ||
| use tracing::{debug, error, instrument, warn}; | ||
|
|
||
| use crate::config::{Config, DeliveryMode}; | ||
| use crate::store::activation::InflightActivationStatus; | ||
|
|
@@ -18,6 +21,7 @@ use crate::store::traits::InflightActivationStore; | |
| pub struct TaskbrokerServer { | ||
| pub store: Arc<dyn InflightActivationStore>, | ||
| pub config: Arc<Config>, | ||
| pub update_tx: Option<Sender<StatusUpdate>>, | ||
| } | ||
|
|
||
| #[tonic::async_trait] | ||
|
|
@@ -97,10 +101,20 @@ impl ConsumerService for TaskbrokerServer { | |
| "Invalid status, expects 3 (Failure), 4 (Retry), or 5 (Complete), but got: {status:?}" | ||
| ))); | ||
| } | ||
|
|
||
| if status == InflightActivationStatus::Failure { | ||
| metrics::counter!("grpc_server.set_status.failure").increment(1); | ||
| } | ||
|
|
||
| if let Some(ref tx) = self.update_tx { | ||
| tx.send((id, status)) | ||
| .await | ||
| .map_err(|_| Status::internal("Status update channel closed"))?; | ||
|
george-sentry marked this conversation as resolved.
|
||
|
|
||
| metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed()); | ||
| return Ok(Response::new(SetTaskStatusResponse { task: None })); | ||
| } | ||
|
|
||
| match self.store.set_status(&id, status).await { | ||
| Ok(Some(_)) => metrics::counter!( | ||
| "grpc_server.set_status", | ||
|
|
@@ -194,3 +208,80 @@ impl ConsumerService for TaskbrokerServer { | |
| res | ||
| } | ||
| } | ||
|
|
||
| pub type StatusUpdate = (String, InflightActivationStatus); | ||
|
|
||
| pub async fn flush_updates( | ||
| store: Arc<dyn InflightActivationStore>, | ||
| buffer: &mut Vec<StatusUpdate>, | ||
| ) { | ||
| if buffer.is_empty() { | ||
| return; | ||
| } | ||
|
|
||
| let mut by_status: HashMap<InflightActivationStatus, Vec<String>> = HashMap::new(); | ||
|
|
||
| for (id, status) in buffer.drain(..) { | ||
| by_status.entry(status).or_default().push(id); | ||
| } | ||
|
|
||
| for (status, ids) in by_status { | ||
| let requested = ids.len() as u64; | ||
| let st = status.to_string(); | ||
|
|
||
| metrics::histogram!("grpc_server.flush_updates.requested", "status" => st.clone()) | ||
| .record(requested as f64); | ||
|
|
||
| match store.set_status_batch(&ids, status).await { | ||
| Ok(affected) => { | ||
| metrics::histogram!( | ||
| "grpc_server.flush_updates.affected", | ||
| "status" => st.clone() | ||
| ) | ||
| .record(affected as f64); | ||
|
|
||
| metrics::counter!( | ||
| "grpc_server.flush_updates.updated", | ||
| "status" => st.clone() | ||
| ) | ||
| .increment(affected); | ||
|
|
||
| metrics::counter!("grpc_server.flush_updates", "result" => "ok").increment(1); | ||
|
|
||
| if affected < requested { | ||
| metrics::counter!( | ||
| "grpc_server.flush_updates.partial", | ||
| "status" => st.clone() | ||
| ) | ||
| .increment(1); | ||
|
|
||
| warn!( | ||
| ?status, | ||
| requested, affected, "Updated fewer rows than IDs requested from server" | ||
| ); | ||
| } | ||
|
|
||
| debug!( | ||
| ?status, | ||
| affected, requested, "Flushed status batch from server" | ||
| ); | ||
| } | ||
|
|
||
| Err(e) => { | ||
| metrics::counter!("grpc_server.flush_updates", "result" => "error").increment(1); | ||
|
|
||
| error!( | ||
| ?status, | ||
| requested, | ||
| error = ?e, | ||
| "Failed to flush status batch from server" | ||
| ); | ||
|
|
||
| // Push failed updates back into the buffer so they can be retried on next flush | ||
| for id in ids { | ||
| buffer.push((id, status)); | ||
| } | ||
|
sentry[bot] marked this conversation as resolved.
Comment on lines
+281
to
+283
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's say the request fails because the DB has a problem (not a transient one) or it is saturated. Do we go through this code only when the worker updates the status or also when we claim the task or reset them during the upkeep ? Upkeep and claim phase have an easier and more deterministic way to batch requests, so they should not go through this.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that'll happen because the buffer is bounded, meaning requests won't pile up. It's not obvious here because the logic that limits buffer size is in When I simulated various database issues during testing (such as high latency and connection reset errors), this did not appear to be a problem. Taskbroker was able to recover on its own as soon as those issues were resolved.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But it may be worth testing again because that was a while ago.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correction... there was a bug in the buffering logic in |
||
| } | ||
| } | ||
| } | ||
|
george-sentry marked this conversation as resolved.
|
||
| } | ||
|
george-sentry marked this conversation as resolved.
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. I'm moving it back to where it was in #637.