From d06440636152373f93ed80d045011dc610d2b32e Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 30 May 2026 22:12:12 +0800 Subject: [PATCH 1/2] refactor: Async should handle flush error in worker thread Signed-off-by: tison --- appenders/async/src/append.rs | 8 +++----- appenders/async/src/lib.rs | 3 +-- appenders/async/src/worker.rs | 9 ++------- 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/appenders/async/src/append.rs b/appenders/async/src/append.rs index dad1dd7..a800a7c 100644 --- a/appenders/async/src/append.rs +++ b/appenders/async/src/append.rs @@ -77,11 +77,9 @@ impl Append for Async { let task = Task::Flush { done: done_tx }; self.state.send_task(task)?; - match done_rx.recv() { - Ok(None) => Ok(()), - Ok(Some(err)) => Err(err), - Err(err) => Err(Error::new("worker exited before completing flush").with_source(err)), - } + done_rx + .recv() + .map_err(|err| Error::new("worker exited before completing flush").with_source(err)) } } diff --git a/appenders/async/src/lib.rs b/appenders/async/src/lib.rs index f237cb7..107d381 100644 --- a/appenders/async/src/lib.rs +++ b/appenders/async/src/lib.rs @@ -17,7 +17,6 @@ #![cfg_attr(docsrs, feature(doc_cfg))] #![deny(missing_docs)] -use logforth_core::Error; use logforth_core::kv; use logforth_core::record::RecordOwned; @@ -35,7 +34,7 @@ enum Task { diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, }, Flush { - done: oneshot::Sender>, + done: oneshot::Sender<()>, }, } diff --git a/appenders/async/src/worker.rs b/appenders/async/src/worker.rs index 96be16e..c9a02bc 100644 --- a/appenders/async/src/worker.rs +++ b/appenders/async/src/worker.rs @@ -67,17 +67,12 @@ impl Worker { }); } Task::Flush { done } => { - let mut error = None; for append in appends.iter() { if let Err(err) = append.flush() { - error = Some( - error - .unwrap_or_else(|| Error::new("failed to flush appender")) - .with_source(err), - ); + trap.trap(&err); } } - let _ = done.send(error); + let _ = done.send(()); } } } From 39085497e57a14ea77291ad51699e2e22582c65a Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 30 May 2026 22:25:54 +0800 Subject: [PATCH 2/2] fixup Signed-off-by: tison --- appenders/async/tests/flushes.rs | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/appenders/async/tests/flushes.rs b/appenders/async/tests/flushes.rs index efb22c6..3ea0af1 100644 --- a/appenders/async/tests/flushes.rs +++ b/appenders/async/tests/flushes.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use std::sync::Barrier; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; +use std::sync::mpsc; use logforth_append_async::AsyncBuilder; use logforth_core::Append; @@ -56,10 +57,14 @@ impl Append for FailingFlush { } #[derive(Debug)] -struct NoopTrap; +struct CapturedTrap { + errors: mpsc::Sender, +} -impl Trap for NoopTrap { - fn trap(&self, _: &Error) {} +impl Trap for CapturedTrap { + fn trap(&self, err: &Error) { + self.errors.send(err.to_string()).unwrap(); + } } #[test] @@ -92,14 +97,16 @@ fn flush_waits_for_worker_completion() { } #[test] -fn flush_propagates_errors() { +fn flush_handles_errors_in_worker_thread() { + let (tx, rx) = mpsc::channel(); + let async_append = AsyncBuilder::new("async-flush-error") - .trap(NoopTrap) + .trap(CapturedTrap { errors: tx }) .append(FailingFlush) .build(); - let err = async_append.flush().unwrap_err(); - let err = err.to_string(); - assert!(err.contains("failed to flush")); + async_append.flush().unwrap(); + + let err = rx.recv().unwrap(); assert!(err.contains("flush failed")); }