Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,15 @@ pub struct Config {
/// Maximum time in milliseconds for a single push RPC to the worker service. This should be greater than the worker's internal timeout.
pub push_timeout_ms: u64,

/// Update statuses from the gRPC server in batches?
pub batch_status_updates: bool,

/// The size of a batch of status updates.
pub status_update_batch_size: usize,

/// Maximum milliseconds to wait before flushing a batch of status updates.
pub status_update_interval_ms: u64,

/// The hostname used to construct `callback_url` for task push requests.
pub callback_addr: String,

Expand Down Expand Up @@ -409,6 +418,9 @@ impl Default for Config {
push_queue_size: 1,
push_queue_timeout_ms: 5000,
push_timeout_ms: 30000,
batch_status_updates: false,
status_update_batch_size: 1,
status_update_interval_ms: 100,
callback_addr: "0.0.0.0".into(),
callback_port: 50051,
worker_map: [("sentry".into(), "http://127.0.0.1:50052".into())].into(),
Expand Down
3 changes: 2 additions & 1 deletion src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
}

_ = async {
let start = Instant::now();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. I'm moving it back to where it was in #637.


debug!("Fetching next batch of pending activations...");
metrics::counter!("fetch.loop.count").increment(1);

let start = Instant::now();
let mut backoff = false;

let result = store.claim_activations_for_push(limit, bucket).await;
Expand Down
8 changes: 8 additions & 0 deletions src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ impl InflightActivationStore for MockStore {
unimplemented!()
}

async fn set_status_batch(
&self,
_ids: &[String],
_status: InflightActivationStatus,
) -> Result<u64, Error> {
unimplemented!()
}

async fn set_processing_deadline(
&self,
_id: &str,
Expand Down
63 changes: 63 additions & 0 deletions src/flusher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;

use anyhow::Result;
use tokio::sync::mpsc::Receiver;

/// Run flusher that receives values of type T from a channel and flushes
/// them using the provided async `flush` function either when the batch is
/// full or when the max flush interval has elapsed. This function is **not**
/// responsible for draining the buffer - `flush` does that.
pub async fn run_flusher<T, F>(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this function because I'm also planning to batch claimed → processing updates in the push pool, which will use basically identical machinery.

mut rx: Receiver<T>,
batch_size: usize,
interval_ms: u64,
mut flush: F,
) -> Result<()>
where
F: for<'a> FnMut(&'a mut Vec<T>) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
{
let batch_size = batch_size.max(1);
let interval_ms = interval_ms.max(1);

let period = Duration::from_millis(interval_ms);
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut buffer: Vec<T> = Vec::with_capacity(batch_size);

loop {
tokio::select! {
msg = rx.recv() => {
match msg {
Some(v) => {
buffer.push(v);

while buffer.len() < batch_size && let Ok(update) = rx.try_recv() {
buffer.push(update);
}

if buffer.len() >= batch_size {
flush(&mut buffer).await;
}
}

None => {
// Channel closed (shutdown), flush remaining and exit
flush(&mut buffer).await;
break;
}
}
}

_ = interval.tick() => {
if !buffer.is_empty() {
flush(&mut buffer).await;
}
}
}
}

Ok(())
}
1 change: 1 addition & 0 deletions src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod auth_middleware;
pub mod metrics_middleware;
pub mod server;

#[cfg(test)]
mod server_tests;
93 changes: 92 additions & 1 deletion src/grpc/server.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use anyhow::Result;
use chrono::Utc;
use prost::Message;
use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService;
use sentry_protos::taskbroker::v1::{
FetchNextTask, GetTaskRequest, GetTaskResponse, SetTaskStatusRequest, SetTaskStatusResponse,
TaskActivation, TaskActivationStatus,
};
use tokio::sync::mpsc::Sender;
use tonic::{Request, Response, Status};
use tracing::{error, instrument, warn};
use tracing::{debug, error, instrument, warn};

use crate::config::{Config, DeliveryMode};
use crate::store::activation::InflightActivationStatus;
Expand All @@ -18,6 +21,7 @@ use crate::store::traits::InflightActivationStore;
pub struct TaskbrokerServer {
pub store: Arc<dyn InflightActivationStore>,
pub config: Arc<Config>,
pub update_tx: Option<Sender<StatusUpdate>>,
}

#[tonic::async_trait]
Expand Down Expand Up @@ -97,10 +101,20 @@ impl ConsumerService for TaskbrokerServer {
"Invalid status, expects 3 (Failure), 4 (Retry), or 5 (Complete), but got: {status:?}"
)));
}

if status == InflightActivationStatus::Failure {
metrics::counter!("grpc_server.set_status.failure").increment(1);
}

if let Some(ref tx) = self.update_tx {
tx.send((id, status))
.await
.map_err(|_| Status::internal("Status update channel closed"))?;
Comment thread
george-sentry marked this conversation as resolved.

metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed());
return Ok(Response::new(SetTaskStatusResponse { task: None }));
}

match self.store.set_status(&id, status).await {
Ok(Some(_)) => metrics::counter!(
"grpc_server.set_status",
Expand Down Expand Up @@ -194,3 +208,80 @@ impl ConsumerService for TaskbrokerServer {
res
}
}

pub type StatusUpdate = (String, InflightActivationStatus);

pub async fn flush_updates(
store: Arc<dyn InflightActivationStore>,
buffer: &mut Vec<StatusUpdate>,
) {
if buffer.is_empty() {
return;
}

let mut by_status: HashMap<InflightActivationStatus, Vec<String>> = HashMap::new();

for (id, status) in buffer.drain(..) {
by_status.entry(status).or_default().push(id);
}

for (status, ids) in by_status {
let requested = ids.len() as u64;
let st = status.to_string();

metrics::histogram!("grpc_server.flush_updates.requested", "status" => st.clone())
.record(requested as f64);

match store.set_status_batch(&ids, status).await {
Ok(affected) => {
metrics::histogram!(
"grpc_server.flush_updates.affected",
"status" => st.clone()
)
.record(affected as f64);

metrics::counter!(
"grpc_server.flush_updates.updated",
"status" => st.clone()
)
.increment(affected);

metrics::counter!("grpc_server.flush_updates", "result" => "ok").increment(1);

if affected < requested {
metrics::counter!(
"grpc_server.flush_updates.partial",
"status" => st.clone()
)
.increment(1);

warn!(
?status,
requested, affected, "Updated fewer rows than IDs requested from server"
);
}

debug!(
?status,
affected, requested, "Flushed status batch from server"
);
}

Err(e) => {
metrics::counter!("grpc_server.flush_updates", "result" => "error").increment(1);

error!(
?status,
requested,
error = ?e,
"Failed to flush status batch from server"
);

// Push failed updates back into the buffer so they can be retried on next flush
for id in ids {
buffer.push((id, status));
}
Comment thread
sentry[bot] marked this conversation as resolved.
Comment on lines +281 to +283
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say the request fails because the DB has a problem (not a transient one) or it is saturated.
Are we going to pile up requests till the broker goes out of memory? This would leave the database in an inconsistent state that will be very hard to troubleshoot.

Do we go through this code only when the worker updates the status or also when we claim the task or reset them during the upkeep ? Upkeep and claim phase have an easier and more deterministic way to batch requests, so they should not go through this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that'll happen because the buffer is bounded, meaning requests won't pile up. It's not obvious here because the logic that limits buffer size is in flusher.rs.

When I simulated various database issues during testing (such as high latency and connection reset errors), this did not appear to be a problem. Taskbroker was able to recover on its own as soon as those issues were resolved.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it may be worth testing again because that was a while ago.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction... there was a bug in the buffering logic in flusher.rs that would result in unbounded growth. I'm fixing it in #637.

}
}
}
Comment thread
george-sentry marked this conversation as resolved.
}
Comment thread
george-sentry marked this conversation as resolved.
Loading
Loading