Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions dash-spv-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,41 @@ pub unsafe extern "C" fn dash_spv_ffi_client_new(
}
}

/// Maximum time to wait for the run task to exit cooperatively before aborting.
const RUN_TASK_SHUTDOWN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

impl FFIDashSpvClient {
/// Cancel the run task and wait for it to finish.
fn cancel_run_task(&self) {
/// Wait for the run task to finish cooperatively, aborting only on timeout.
///
/// The caller must cancel `shutdown_token` before calling this so that
/// `DashSpvClient::run()` exits its loop and cleans up monitor tasks.
/// Only falls back to `abort()` if the task doesn't exit within the timeout.
fn wait_for_run_task(&self) {
let task = self.run_task.lock().unwrap().take();
if let Some(task) = task {
task.abort();
self.runtime.block_on(async {
let _ = task.await;
if let Some(mut task) = task {
let finished = self.runtime.block_on(async {
tokio::time::timeout(RUN_TASK_SHUTDOWN_TIMEOUT, &mut task).await
});
match finished {
Ok(Ok(())) => {}
Ok(Err(e)) => tracing::warn!("Run task exited with join error: {}", e),
Err(_) => {
tracing::warn!(
"Run task did not exit within {:?}, aborting",
RUN_TASK_SHUTDOWN_TIMEOUT,
);
task.abort();
let _ = self.runtime.block_on(task);
}
}
}
}
}

fn stop_client_internal(client: &mut FFIDashSpvClient) -> Result<(), dash_spv::SpvError> {
client.shutdown_token.cancel();

client.cancel_run_task();
client.wait_for_run_task();

let result = client.runtime.block_on(async { client.inner.stop().await });

Expand Down Expand Up @@ -334,17 +352,18 @@ pub unsafe extern "C" fn dash_spv_ffi_client_destroy(client: *mut FFIDashSpvClie
if !client.is_null() {
let client = Box::from_raw(client);

// Cancel shutdown token to stop all tasks
// Cancel shutdown token so run() exits its loop and cleans up
client.shutdown_token.cancel();

// Stop the SPV client
// Wait for the run task to finish (cooperative, with timeout fallback)
client.wait_for_run_task();

// Stop the SPV client (run() calls stop() internally, but this
// handles the case where run() was never called or was aborted)
client.runtime.block_on(async {
let _ = client.inner.stop().await;
});

// Abort and await the run task
client.cancel_run_task();

tracing::info!("FFI client destroyed and all tasks cleaned up");
}
}
Expand Down
Loading