Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,19 @@ pub struct Config {
/// Maximum milliseconds to wait before flushing a batch of status updates.
pub status_update_interval_ms: u64,

/// The hostname used to construct `callback_url` for task push requests.
/// Update claimed → processing (dispatch) updates in batches?
pub batch_push_updates: bool,

/// The size of a batch of dispatch updates.
pub push_update_batch_size: usize,

/// Maximum milliseconds to wait before flushing a batch of dispatch updates.
pub push_update_interval_ms: u64,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as all this is done, we should probably reorganize the configuration. It has become enormous 😅

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could break it up into a few different nested structures.


/// (DEPRECATED) The hostname used to construct `callback_url` for task push requests.
pub callback_addr: String,

/// The port used to construct `callback_url` for task push requests.
/// (DEPRECATED) The port used to construct `callback_url` for task push requests.
pub callback_port: u32,

/// Maps every application to its worker endpoint, both represented as strings.
Expand Down Expand Up @@ -421,6 +430,9 @@ impl Default for Config {
batch_status_updates: false,
status_update_batch_size: 1,
status_update_interval_ms: 100,
batch_push_updates: false,
push_update_batch_size: 1,
push_update_interval_ms: 100,
callback_addr: "0.0.0.0".into(),
callback_port: 50051,
worker_map: [("sentry".into(), "http://127.0.0.1:50052".into())].into(),
Expand Down
3 changes: 1 addition & 2 deletions src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,10 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
}

_ = async {
let start = Instant::now();

debug!("Fetching next batch of pending activations...");
metrics::counter!("fetch.loop.count").increment(1);

let start = Instant::now();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting the timer here results in more accurate metrics. It was already like this before but I unintentionally moved it in #618.

let mut backoff = false;

let result = store.claim_activations_for_push(limit, bucket).await;
Expand Down
6 changes: 5 additions & 1 deletion src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,14 @@ impl InflightActivationStore for MockStore {
})
}

async fn mark_activation_processing(&self, _id: &str) -> Result<(), Error> {
async fn mark_processing(&self, _id: &str) -> Result<(), Error> {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor naming change to match mark_completed and shorten things.

Ok(())
}

async fn mark_processing_batch(&self, _ids: &[String]) -> Result<u64, Error> {
unimplemented!()
}

async fn pending_activation_max_lag(&self, _now: &DateTime<Utc>) -> f64 {
unimplemented!()
}
Expand Down
37 changes: 32 additions & 5 deletions src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::pin::Pin;
use std::time::Duration;

use anyhow::Result;
use elegant_departure::get_shutdown_guard;
use tokio::sync::mpsc::Receiver;
use tracing::debug;

/// Run flusher that receives values of type T from a channel and flushes
/// them using the provided async `flush` function either when the batch is
Expand All @@ -27,9 +29,16 @@ where

let mut buffer: Vec<T> = Vec::with_capacity(batch_size);

let guard = get_shutdown_guard().shutdown_on_drop();

loop {
tokio::select! {
msg = rx.recv() => {
biased;

// When the buffer is NOT full, try to receive another message
msg = rx.recv(), if buffer.len() < batch_size => {
debug!("Buffer is NOT full, receiving a message...");

match msg {
Some(v) => {
buffer.push(v);
Expand All @@ -39,25 +48,43 @@ where
}

if buffer.len() >= batch_size {
debug!("Flushing full buffer...");
flush(&mut buffer).await;
}
}

None => {
// Channel closed (shutdown), flush remaining and exit
flush(&mut buffer).await;
// Channel closed
debug!("Channel closed!");
break;
}
}
}

// Otherwise, try flushing whatever is in the buffer every `interval_ms` milliseconds
_ = interval.tick() => {
if !buffer.is_empty() {
flush(&mut buffer).await;
debug!("Performing periodic flush...");

if rx.is_closed() {
debug!("Channel closed on tick!");
break;
}

flush(&mut buffer).await;
Comment thread
george-sentry marked this conversation as resolved.
}

_ = guard.wait() => {
debug!("Shutdown guard triggered!");
break;
}
}
}

// Drain and flush before exit
while let Ok(update) = rx.try_recv() {
buffer.push(update);
}

flush(&mut buffer).await;
Ok(())
}
3 changes: 3 additions & 0 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ impl ConsumerService for TaskbrokerServer {
}

if let Some(ref tx) = self.update_tx {
let depth = tx.max_capacity() - tx.capacity();
metrics::gauge!("grpc_server.update_queue.depth").set(depth as f64);

tx.send((id, status))
.await
.map_err(|_| Status::internal("Status update channel closed"))?;
Expand Down
34 changes: 30 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use taskbroker::config::{Config, DatabaseAdapter, DeliveryMode};
use taskbroker::fetch::FetchPool;
use taskbroker::grpc::auth_middleware::AuthLayer;
use taskbroker::grpc::metrics_middleware::MetricsLayer;
use taskbroker::grpc::server::{TaskbrokerServer, flush_updates};
use taskbroker::grpc::server::TaskbrokerServer;
use taskbroker::kafka::admin::create_missing_topics;
use taskbroker::kafka::consumer::start_consumer;
use taskbroker::kafka::deserialize::{self, DeserializeConfig};
Expand All @@ -27,7 +27,6 @@ use taskbroker::kafka::inflight_activation_writer::{
ActivationWriterConfig, InflightActivationWriter,
};
use taskbroker::kafka::os_stream_writer::{OsStream, OsStreamWriter};
use taskbroker::logging;
use taskbroker::metrics;
use taskbroker::processing_strategy;
use taskbroker::push::PushPool;
Expand All @@ -40,6 +39,7 @@ use taskbroker::store::traits::InflightActivationStore;
use taskbroker::upkeep::upkeep;
use taskbroker::{Args, get_version};
use taskbroker::{SERVICE_NAME, flusher};
use taskbroker::{grpc, logging, push};

async fn log_task_completion<T: AsRef<str>>(name: T, task: JoinHandle<Result<(), Error>>) {
match task.await {
Expand Down Expand Up @@ -203,7 +203,7 @@ async fn main() -> Result<(), Error> {
rx,
flusher_config.status_update_batch_size,
flusher_config.status_update_interval_ms,
move |buffer| Box::pin(flush_updates(flusher_store.clone(), buffer)),
move |buffer| Box::pin(grpc::server::flush_updates(flusher_store.clone(), buffer)),
)
.await
});
Expand Down Expand Up @@ -265,8 +265,30 @@ async fn main() -> Result<(), Error> {
}
});

// Push update flush task
let (push_update_tx, push_update_task) = if config.batch_push_updates {
let (tx, rx) = tokio::sync::mpsc::channel(config.push_update_batch_size.max(1));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reasoning behind setting the channel size to the batch flush size? Did we do any testing around how changing the amount slack between these two configurations affect the system?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A metric that would be interesting is the channel's depth which might give some intuition behind if producers want to send faster than the flusher drains

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added gauges for grpc_server.update_queue.depth and push.update_queue.depth.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And no, I haven't extensively tested different channel / batch flush sizes. Good idea for the future.


let flusher_store = store.clone();
let flusher_config = config.clone();

let handle = tokio::spawn(async move {
flusher::run_flusher(
rx,
flusher_config.push_update_batch_size,
flusher_config.push_update_interval_ms,
move |buffer| Box::pin(push::flush_updates(flusher_store.clone(), buffer)),
)
.await
});

(Some(tx), Some(handle))
} else {
(None, None)
};

// Initialize push and fetch pools
let push_pool = Arc::new(PushPool::new(config.clone(), store.clone()));
let push_pool = Arc::new(PushPool::new(config.clone(), store.clone(), push_update_tx));
let fetch_pool = FetchPool::new(store.clone(), config.clone(), push_pool.clone());

// Initialize push threads
Expand Down Expand Up @@ -305,6 +327,10 @@ async fn main() -> Result<(), Error> {
departure = departure.on_completion(log_task_completion("status_update_task", task));
}

if let Some(task) = push_update_task {
departure = departure.on_completion(log_task_completion("push_update_task", task));
}
Comment thread
sentry[bot] marked this conversation as resolved.

departure.await;
Ok(())
}
Loading
Loading