Skip to content

Commit edf6185

Browse files
committed
Drop forgotten jobs in another thread
To avoid blocking the event loop.
1 parent 3efb6e2 commit edf6185

1 file changed

Lines changed: 27 additions & 5 deletions

File tree

  • crates/hyperqueue/src/server/client

crates/hyperqueue/src/server/client/mod.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tokio::sync::{Notify, mpsc};
1414
use crate::client::status::{Status, job_status};
1515
use crate::common::serverdir::ServerDir;
1616
use crate::server::event::Event;
17-
use crate::server::job::JobTaskState;
17+
use crate::server::job::{Job, JobTaskState};
1818
use crate::server::state::{State, StateRef};
1919
use crate::transfer::connection::accept_client;
2020
use crate::transfer::messages::{
@@ -278,7 +278,18 @@ pub async fn client_rpc_loop<
278278
response
279279
}
280280
FromClientMessage::ForgetJob(msg) => {
281-
handle_job_forget(&state_ref, senders, &msg.selector, msg.filter)
281+
let (msg, forgotten_jobs) =
282+
handle_job_forget(&state_ref, senders, &msg.selector, msg.filter);
283+
// The Drop implementation (even for a single job! because it can have
284+
// thousands of tasks) can take a long time and stall the event loop.
285+
// So we perform it on another thread.
286+
if !forgotten_jobs.is_empty() {
287+
let _ = tokio::task::spawn_blocking(move || {
288+
drop(forgotten_jobs);
289+
})
290+
.await;
291+
}
292+
msg
282293
}
283294
FromClientMessage::JobDetail(msg) => {
284295
compute_job_detail(&state_ref, msg.job_id_selector, msg.task_selector)
@@ -742,16 +753,22 @@ async fn cancel_job(state_ref: &StateRef, senders: &Senders, job_id: JobId) -> C
742753
}
743754
}
744755

756+
/// Forgetting jobs can release a lot of memory and perform a lot of destructors.
757+
/// Since this happens in synchronous code, it can stall the event loop, which can lead to e.g.
758+
/// missing worker heartbeats and other problems.
759+
///
760+
/// We thus return the forgotten jobs, so that we can drop them later in a separate thread.
745761
fn handle_job_forget(
746762
state_ref: &StateRef,
747763
senders: &Senders,
748764
selector: &IdSelector,
749765
allowed_statuses: Vec<Status>,
750-
) -> ToClientMessage {
766+
) -> (ToClientMessage, Vec<Job>) {
751767
let mut state = state_ref.get_mut();
752768
let job_ids: Vec<JobId> = get_job_ids(&state, selector);
753769
let mut forgotten: usize = 0;
754770

771+
let mut forgotten_jobs = Vec::with_capacity(job_ids.len());
755772
for &job_id in &job_ids {
756773
let can_be_forgotten = state
757774
.get_job(job_id)
@@ -760,7 +777,9 @@ fn handle_job_forget(
760777
})
761778
.unwrap_or(false);
762779
if can_be_forgotten {
763-
state.forget_job(job_id);
780+
if let Some(job) = state.forget_job(job_id) {
781+
forgotten_jobs.push(job);
782+
}
764783
forgotten += 1;
765784
}
766785
}
@@ -769,7 +788,10 @@ fn handle_job_forget(
769788

770789
let ignored = job_ids.len() - forgotten;
771790

772-
ToClientMessage::ForgetJobResponse(ForgetJobResponse { forgotten, ignored })
791+
(
792+
ToClientMessage::ForgetJobResponse(ForgetJobResponse { forgotten, ignored }),
793+
forgotten_jobs,
794+
)
773795
}
774796

775797
fn handle_get_list(state_ref: &StateRef, workers: bool) -> ToClientMessage {

0 commit comments

Comments
 (0)