From eb6e5edeb98fc2acbbb6a100dafdcb7b9575f346 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 17 Apr 2026 00:09:27 -0400 Subject: [PATCH 1/2] refactor(cold): unify read backpressure around permit-attached messages Moves semaphore permit acquisition to `ColdStorageHandle` so permits travel with read requests into the channel. The task runner splits into two concurrent subtasks: - **Dispatcher**: pulls `PermittedReadRequest`s and spawns handlers, wrapping each in a per-request deadline (default 5s). - **Writer**: consumes writes sequentially. Drain-before-write uses `Semaphore::acquire_many_owned(64)`, now wrapped in a cancel-select so shutdown cannot hang on a stuck reader. The semaphore is now the single backpressure mechanism. The read channel is sized to match permit count, so `try_send` from a caller holding a permit is guaranteed to have capacity. New `ColdStorageError::Timeout` is returned to callers whose handler exceeds the deadline; dropping the handler future releases its permit back to the pool, so a stuck backend call self-heals. Tests (`crates/cold/tests/concurrency.rs`) add a `GatedBackend` helper and four new regression cases: fairness under saturation, cancel during reader backpressure, cancel during write drain, and operation-deadline permit release. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/cold/src/error.rs | 10 + crates/cold/src/lib.rs | 4 +- crates/cold/src/request.rs | 44 ++- crates/cold/src/task/handle.rs | 43 ++- crates/cold/src/task/runner.rs | 547 +++++++++++++++++++------------ crates/cold/tests/concurrency.rs | 419 ++++++++++++++++++++++- 6 files changed, 835 insertions(+), 232 deletions(-) diff --git a/crates/cold/src/error.rs b/crates/cold/src/error.rs index c92fc7c..873c043 100644 --- a/crates/cold/src/error.rs +++ b/crates/cold/src/error.rs @@ -49,6 +49,16 @@ pub enum ColdStorageError { #[error("stream deadline exceeded")] StreamDeadlineExceeded, + /// A non-streaming read operation exceeded its deadline. + /// + /// The task enforces a wall-clock limit on in-task read handlers so + /// that a stuck backend call cannot indefinitely hold a concurrency + /// permit and block the drain-before-write barrier. Operations that + /// legitimately take longer than this deadline should use + /// [`StreamLogs`](crate::ColdReadRequest::StreamLogs) instead. + #[error("cold read deadline exceeded")] + Timeout, + /// A reorg was detected during a streaming operation. /// /// The anchor block hash changed between chunks, indicating that diff --git a/crates/cold/src/lib.rs b/crates/cold/src/lib.rs index dd61d99..8a889cb 100644 --- a/crates/cold/src/lib.rs +++ b/crates/cold/src/lib.rs @@ -145,7 +145,9 @@ mod error; pub use error::{ColdResult, ColdStorageError}; mod request; -pub use request::{AppendBlockRequest, ColdReadRequest, ColdWriteRequest, Responder}; +pub use request::{ + AppendBlockRequest, ColdReadRequest, ColdWriteRequest, PermittedReadRequest, Responder, +}; mod specifier; pub use alloy::rpc::types::{Filter, Log as RpcLog}; pub use signet_storage_types::{Confirmed, Recovered}; diff --git a/crates/cold/src/request.rs b/crates/cold/src/request.rs index 32ee971..cfffcda 100644 --- a/crates/cold/src/request.rs +++ b/crates/cold/src/request.rs @@ -10,7 +10,7 @@ use crate::{ use alloy::primitives::BlockNumber; use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader}; use std::time::Duration; -use tokio::sync::oneshot; +use tokio::sync::{OwnedSemaphorePermit, oneshot}; /// Response sender type alias that propagates Result types. pub type Responder = oneshot::Sender>; @@ -24,6 +24,27 @@ pub struct AppendBlockRequest { pub resp: Responder<()>, } +/// A read request with an attached concurrency permit. +/// +/// The permit is acquired on the handle side before sending, bounds +/// concurrent in-flight readers, and doubles as the drain-before-write +/// marker in the task runner. It is released when the spawned handler +/// completes (or panics, or is dropped on deadline expiry). +#[derive(Debug)] +pub struct PermittedReadRequest { + /// The concurrency permit, released when the handler future is dropped. + pub permit: OwnedSemaphorePermit, + /// The read request itself. + pub req: ColdReadRequest, +} + +impl PermittedReadRequest { + /// Construct a new permitted request. + pub const fn new(permit: OwnedSemaphorePermit, req: ColdReadRequest) -> Self { + Self { permit, req } + } +} + /// Read requests for cold storage. /// /// These requests are processed concurrently (up to 64 in flight). @@ -139,6 +160,27 @@ pub enum ColdReadRequest { }, } +impl ColdReadRequest { + /// Short static name of the request variant, for logging and metrics. + pub const fn variant_name(&self) -> &'static str { + match self { + Self::GetHeader { .. } => "GetHeader", + Self::GetHeaders { .. } => "GetHeaders", + Self::GetTransaction { .. } => "GetTransaction", + Self::GetTransactionsInBlock { .. } => "GetTransactionsInBlock", + Self::GetTransactionCount { .. } => "GetTransactionCount", + Self::GetReceipt { .. } => "GetReceipt", + Self::GetReceiptsInBlock { .. } => "GetReceiptsInBlock", + Self::GetSignetEvents { .. } => "GetSignetEvents", + Self::GetZenithHeader { .. } => "GetZenithHeader", + Self::GetZenithHeaders { .. } => "GetZenithHeaders", + Self::GetLogs { .. } => "GetLogs", + Self::StreamLogs { .. } => "StreamLogs", + Self::GetLatestBlock { .. } => "GetLatestBlock", + } + } +} + /// Write requests for cold storage. /// /// These requests are processed sequentially to maintain ordering. diff --git a/crates/cold/src/task/handle.rs b/crates/cold/src/task/handle.rs index 57c2d38..2aa173a 100644 --- a/crates/cold/src/task/handle.rs +++ b/crates/cold/src/task/handle.rs @@ -11,13 +11,13 @@ use crate::{ AppendBlockRequest, BlockData, ColdReadRequest, ColdReceipt, ColdResult, ColdStorageError, - ColdWriteRequest, Confirmed, Filter, HeaderSpecifier, LogStream, ReceiptSpecifier, RpcLog, - SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier, + ColdWriteRequest, Confirmed, Filter, HeaderSpecifier, LogStream, PermittedReadRequest, + ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier, }; use alloy::primitives::{B256, BlockNumber}; use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader}; -use std::time::Duration; -use tokio::sync::{mpsc, oneshot}; +use std::{sync::Arc, time::Duration}; +use tokio::sync::{Semaphore, mpsc, oneshot}; /// Map a [`mpsc::error::TrySendError`] to the appropriate /// [`ColdStorageError`] variant. @@ -52,22 +52,40 @@ fn map_dispatch_error(e: mpsc::error::TrySendError) -> ColdStorageError { /// Multiple readers can query concurrently without blocking writes. #[derive(Clone, Debug)] pub struct ColdStorageReadHandle { - sender: mpsc::Sender, + sender: mpsc::Sender, + sem: Arc, } impl ColdStorageReadHandle { - /// Create a new read-only handle with the given sender. - pub(crate) const fn new(sender: mpsc::Sender) -> Self { - Self { sender } + /// Create a new read-only handle with the given sender and permit + /// pool. + pub(crate) const fn new( + sender: mpsc::Sender, + sem: Arc, + ) -> Self { + Self { sender, sem } } /// Send a read request and wait for the response. + /// + /// Acquires a concurrency permit before sending. The channel is + /// sized to match the permit pool, so once a permit is held, send + /// is guaranteed to have capacity. async fn send( &self, req: ColdReadRequest, rx: oneshot::Receiver>, ) -> ColdResult { - self.sender.send(req).await.map_err(|_| ColdStorageError::Cancelled)?; + let permit = Arc::clone(&self.sem) + .acquire_owned() + .await + .map_err(|_| ColdStorageError::TaskTerminated)?; + self.sender.try_send(PermittedReadRequest::new(permit, req)).map_err(|e| match e { + mpsc::error::TrySendError::Full(_) => { + unreachable!("semaphore permit implies channel capacity") + } + mpsc::error::TrySendError::Closed(_) => ColdStorageError::TaskTerminated, + })?; rx.await.map_err(|_| ColdStorageError::Cancelled)? } @@ -357,12 +375,13 @@ pub struct ColdStorageHandle { } impl ColdStorageHandle { - /// Create a new handle with the given senders. + /// Create a new handle with the given senders and permit pool. pub(crate) const fn new( - read_sender: mpsc::Sender, + read_sender: mpsc::Sender, + read_sem: Arc, write_sender: mpsc::Sender, ) -> Self { - Self { reader: ColdStorageReadHandle::new(read_sender), write_sender } + Self { reader: ColdStorageReadHandle::new(read_sender, read_sem), write_sender } } /// Get a read-only handle that shares the read channel. diff --git a/crates/cold/src/task/runner.rs b/crates/cold/src/task/runner.rs index da213be..adc96b2 100644 --- a/crates/cold/src/task/runner.rs +++ b/crates/cold/src/task/runner.rs @@ -1,48 +1,67 @@ //! Cold storage task runner. //! -//! The [`ColdStorageTask`] processes requests from channels and dispatches -//! them to the storage backend. Reads and writes use separate channels: +//! The [`ColdStorageTask`] processes requests from two channels and +//! dispatches them to the storage backend. Internally it runs two +//! concurrent subtasks: //! -//! - **Reads**: Processed concurrently (up to 64 in flight) via spawned tasks. -//! Each reader holds a permit on `read_semaphore` for the lifetime of the -//! handler; the semaphore is the backpressure mechanism. In-flight reads -//! are drained before each write. -//! - **Writes**: Processed sequentially (inline await) to maintain ordering. -//! Draining is implemented by acquiring all [`MAX_CONCURRENT_READERS`] -//! permits on `read_semaphore`, which unblocks only once every in-flight -//! reader has finished and released its permit. -//! - **Streams**: Log-streaming producers run independently, tracked for -//! graceful shutdown but not drained before writes. Backends must provide -//! their own read isolation (e.g. snapshot semantics). +//! - **Dispatcher** — consumes [`PermittedReadRequest`]s and spawns a +//! handler for each. The permit attached to the request bounds +//! concurrency and travels with the message for the lifetime of the +//! handler. +//! - **Writer** — consumes [`ColdWriteRequest`]s sequentially. Before +//! each write, it drains all in-flight readers by acquiring every +//! permit on the shared semaphore, giving the write exclusive backend +//! access. The drain is wrapped in a cancel-select so shutdown can +//! preempt a stuck reader. +//! +//! The shared [`Semaphore`] is the single backpressure mechanism. Its +//! permits are acquired on the handle side (see +//! [`ColdStorageReadHandle`](crate::ColdStorageReadHandle)), travel into +//! the channel in [`PermittedReadRequest`], and are released when the +//! spawned handler's future is dropped (on completion, panic, or +//! deadline expiry). //! //! Transaction, receipt, and header lookups are served from an LRU cache, //! avoiding repeated backend reads for frequently queried items. //! //! # Log Streaming //! -//! The task owns the streaming configuration (max deadline, concurrency -//! limit) and delegates the streaming loop to the backend via -//! [`ColdStorageRead::produce_log_stream`]. Callers supply a per-request -//! deadline that is clamped to the task's configured maximum. +//! Log-streaming producers (`StreamLogs`) run independently and are +//! tracked separately for graceful shutdown. They are NOT drained +//! before writes. Backends must provide their own read isolation +//! (snapshot semantics) for streaming reads. use super::cache::ColdCache; use crate::{ ColdReadRequest, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageHandle, - ColdStorageRead, ColdWriteRequest, Confirmed, HeaderSpecifier, LogStream, ReceiptSpecifier, - TransactionSpecifier, + ColdStorageRead, ColdWriteRequest, Confirmed, HeaderSpecifier, LogStream, PermittedReadRequest, + ReceiptSpecifier, Responder, TransactionSpecifier, }; use signet_storage_types::{RecoveredTx, SealedHeader}; -use std::{sync::Arc, time::Duration}; +use std::{future::Future, sync::Arc, time::Duration}; use tokio::sync::{Mutex, Semaphore, mpsc}; use tokio_stream::wrappers::ReceiverStream; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use tracing::{debug, instrument}; +use tracing::{debug, instrument, warn}; /// Default maximum deadline for streaming operations. const DEFAULT_MAX_STREAM_DEADLINE: Duration = Duration::from_secs(60); +/// Default per-request deadline for non-streaming reads. +/// +/// Long enough to absorb normal backend jitter (pool contention, +/// brief network hiccups) and well below the 12s write cadence at +/// which a stuck reader would otherwise repressurize the write mpsc +/// and regenerate the original backpressure-induced crash. +const DEFAULT_READ_DEADLINE: Duration = Duration::from_secs(5); + /// Channel size for cold storage read requests. -const READ_CHANNEL_SIZE: usize = 256; +/// +/// Sized to match [`MAX_CONCURRENT_READERS`]: because callers acquire +/// a semaphore permit before sending, at most that many items can be +/// in the channel simultaneously. `try_send` from a caller holding a +/// permit is therefore guaranteed to have capacity. +const READ_CHANNEL_SIZE: usize = MAX_CONCURRENT_READERS; /// Channel size for cold storage write requests. const WRITE_CHANNEL_SIZE: usize = 256; @@ -112,74 +131,121 @@ impl ColdStorageTaskInner { r } - /// Handle a read request, checking the cache first where applicable. - async fn handle_read(self: &Arc, req: ColdReadRequest) { + /// Handle a read request with a wall-clock deadline. + /// + /// On normal completion, the result is sent via the variant's + /// oneshot sender. On deadline expiry, the working future is + /// dropped and the caller receives [`ColdStorageError::Timeout`]. + /// [`ColdReadRequest::StreamLogs`] bypasses the deadline — it has + /// its own stream deadline. + async fn handle_read_req(self: Arc, req: ColdReadRequest, deadline: Duration) { + let op = req.variant_name(); match req { ColdReadRequest::GetHeader { spec, resp } => { - let result = if let HeaderSpecifier::Number(n) = &spec { - if let Some(hit) = self.cache.lock().await.get_header(n) { - Ok(Some(hit)) - } else { - self.fetch_and_cache_header(spec).await + let this = Arc::clone(&self); + run_with_deadline(deadline, op, resp, async move { + if let HeaderSpecifier::Number(n) = &spec + && let Some(hit) = this.cache.lock().await.get_header(n) + { + return Ok(Some(hit)); } - } else { - self.fetch_and_cache_header(spec).await - }; - let _ = resp.send(result); + this.fetch_and_cache_header(spec).await + }) + .await; } ColdReadRequest::GetHeaders { specs, resp } => { - let _ = resp.send(self.read_backend.get_headers(specs).await); + let this = Arc::clone(&self); + run_with_deadline(deadline, op, resp, async move { + this.read_backend.get_headers(specs).await + }) + .await; } ColdReadRequest::GetTransaction { spec, resp } => { - let result = if let TransactionSpecifier::BlockAndIndex { block, index } = &spec { - if let Some(hit) = self.cache.lock().await.get_tx(&(*block, *index)) { - Ok(Some(hit)) - } else { - self.fetch_and_cache_tx(spec).await + let this = Arc::clone(&self); + run_with_deadline(deadline, op, resp, async move { + if let TransactionSpecifier::BlockAndIndex { block, index } = &spec + && let Some(hit) = this.cache.lock().await.get_tx(&(*block, *index)) + { + return Ok(Some(hit)); } - } else { - self.fetch_and_cache_tx(spec).await - }; - let _ = resp.send(result); + this.fetch_and_cache_tx(spec).await + }) + .await; } ColdReadRequest::GetTransactionsInBlock { block, resp } => { - let _ = resp.send(self.read_backend.get_transactions_in_block(block).await); + let this = Arc::clone(&self); + run_with_deadline(deadline, op, resp, async move { + this.read_backend.get_transactions_in_block(block).await + }) + .await; } ColdReadRequest::GetTransactionCount { block, resp } => { - let _ = resp.send(self.read_backend.get_transaction_count(block).await); + let this = Arc::clone(&self); + run_with_deadline(deadline, op, resp, async move { + this.read_backend.get_transaction_count(block).await + }) + .await; } ColdReadRequest::GetReceipt { spec, resp } => { - let result = if let ReceiptSpecifier::BlockAndIndex { block, index } = &spec { - if let Some(hit) = self.cache.lock().await.get_receipt(&(*block, *index)) { - Ok(Some(hit)) - } else { - self.fetch_and_cache_receipt(spec).await + let this = Arc::clone(&self); + run_with_deadline(deadline, op, resp, async move { + if let ReceiptSpecifier::BlockAndIndex { block, index } = &spec + && let Some(hit) = this.cache.lock().await.get_receipt(&(*block, *index)) + { + return Ok(Some(hit)); } - } else { - self.fetch_and_cache_receipt(spec).await - }; - let _ = resp.send(result); + this.fetch_and_cache_receipt(spec).await + }) + .await; } ColdReadRequest::GetReceiptsInBlock { block, resp } => { - let _ = resp.send(self.read_backend.get_receipts_in_block(block).await); + let this = Arc::clone(&self); + run_with_deadline(deadline, op, resp, async move { + this.read_backend.get_receipts_in_block(block).await + }) + .await; } ColdReadRequest::GetSignetEvents { spec, resp } => { - let _ = resp.send(self.read_backend.get_signet_events(spec).await); + let this = Arc::clone(&self); + run_with_deadline(deadline, op, resp, async move { + this.read_backend.get_signet_events(spec).await + }) + .await; } ColdReadRequest::GetZenithHeader { spec, resp } => { - let _ = resp.send(self.read_backend.get_zenith_header(spec).await); + let this = Arc::clone(&self); + run_with_deadline(deadline, op, resp, async move { + this.read_backend.get_zenith_header(spec).await + }) + .await; } ColdReadRequest::GetZenithHeaders { spec, resp } => { - let _ = resp.send(self.read_backend.get_zenith_headers(spec).await); + let this = Arc::clone(&self); + run_with_deadline(deadline, op, resp, async move { + this.read_backend.get_zenith_headers(spec).await + }) + .await; } ColdReadRequest::GetLogs { filter, max_logs, resp } => { - let _ = resp.send(self.read_backend.get_logs(&filter, max_logs).await); + let this = Arc::clone(&self); + run_with_deadline(deadline, op, resp, async move { + this.read_backend.get_logs(&filter, max_logs).await + }) + .await; } - ColdReadRequest::StreamLogs { filter, max_logs, deadline, resp } => { - let _ = resp.send(self.handle_stream_logs(*filter, max_logs, deadline).await); + ColdReadRequest::StreamLogs { filter, max_logs, deadline: stream_deadline, resp } => { + // Streams use their own deadline and produce a stream + // handle almost immediately. Not wrapped in the read + // deadline. + let _ = + resp.send(self.handle_stream_logs(*filter, max_logs, stream_deadline).await); } ColdReadRequest::GetLatestBlock { resp } => { - let _ = resp.send(self.read_backend.get_latest_block().await); + let this = Arc::clone(&self); + run_with_deadline(deadline, op, resp, async move { + this.read_backend.get_latest_block().await + }) + .await; } } } @@ -230,33 +296,33 @@ impl ColdStorageTaskInner { } } -/// The cold storage task that processes requests. -/// -/// This task receives requests over separate read and write channels and -/// dispatches them to the storage backend. It supports graceful shutdown -/// via a cancellation token. +/// Await `fut` with a wall-clock deadline and send the result via +/// `resp`. /// -/// # Processing Model -/// -/// - **Reads**: Spawned as concurrent tasks (up to 64 in flight). -/// Multiple reads can execute in parallel. In-flight reads are drained -/// before each write to ensure exclusive backend access. -/// - **Writes**: Processed inline (sequential). Each write completes before -/// the next is started, ensuring ordering. -/// - **Streams**: Log-streaming producer tasks run independently of the -/// read/write lifecycle. They are tracked separately for graceful -/// shutdown but are NOT drained before writes. Backends MUST provide -/// their own read isolation for streaming (snapshot semantics). -/// -/// This design prioritizes write ordering for correctness while allowing -/// read throughput to scale with concurrency. -/// -/// # Log Streaming +/// On deadline expiry, emits a WARN trace tagged with the operation +/// name and sends [`ColdStorageError::Timeout`] to the caller. The +/// working future is dropped, which also drops anything it holds — +/// including the caller's concurrency permit. +async fn run_with_deadline(deadline: Duration, op: &'static str, resp: Responder, fut: F) +where + F: Future>, +{ + let result = match tokio::time::timeout(deadline, fut).await { + Ok(r) => r, + Err(_) => { + warn!(operation = op, ?deadline, "cold read deadline exceeded"); + Err(ColdStorageError::Timeout) + } + }; + let _ = resp.send(result); +} + +/// The cold storage task that processes requests. /// -/// The task owns the streaming configuration (max deadline, concurrency -/// limit) and delegates the streaming loop to the backend via -/// [`ColdStorageRead::produce_log_stream`]. Callers supply a per-request -/// deadline that is clamped to the task's configured maximum. +/// This task receives requests over separate read and write channels +/// and dispatches them to the storage backend. Internally it runs a +/// read dispatcher and a writer as concurrent subtasks, joined for +/// graceful shutdown. See the module-level documentation for details. /// /// # Caching /// @@ -266,26 +332,27 @@ impl ColdStorageTaskInner { pub struct ColdStorageTask { inner: Arc>, write_backend: B, - read_receiver: mpsc::Receiver, + read_receiver: mpsc::Receiver, write_receiver: mpsc::Receiver, cancel_token: CancellationToken, - /// Tracks spawned read handlers so the graceful-shutdown path can wait - /// for them to finish. Not used for backpressure — see `read_semaphore`. + /// Tracks spawned read handlers so the graceful-shutdown path can + /// wait for them to finish. Not used for backpressure. task_tracker: TaskTracker, - /// Bounds in-flight read handlers and serves as the drain barrier for - /// writes. + /// The shared backpressure pool. /// - /// A reader acquires one permit before being spawned; the permit is - /// released when the spawned task completes (or panics). Writes acquire - /// all [`MAX_CONCURRENT_READERS`] permits at once, which blocks until - /// every in-flight reader has finished, giving the write exclusive - /// backend access for the drain-before-write invariant. + /// Callers acquire a permit on the handle side before sending. + /// Writers acquire every permit at once as the drain-before-write + /// barrier. read_semaphore: Arc, + /// Per-request deadline applied to non-streaming reads. + read_deadline: Duration, } impl std::fmt::Debug for ColdStorageTask { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ColdStorageTask").finish_non_exhaustive() + f.debug_struct("ColdStorageTask") + .field("read_deadline", &self.read_deadline) + .finish_non_exhaustive() } } @@ -294,6 +361,7 @@ impl ColdStorageTask { pub fn new(backend: B, cancel_token: CancellationToken) -> (Self, ColdStorageHandle) { let (read_sender, read_receiver) = mpsc::channel(READ_CHANNEL_SIZE); let (write_sender, write_receiver) = mpsc::channel(WRITE_CHANNEL_SIZE); + let read_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_READERS)); let read_backend = backend.clone(); let task = Self { inner: Arc::new(ColdStorageTaskInner { @@ -308,137 +376,206 @@ impl ColdStorageTask { write_receiver, cancel_token, task_tracker: TaskTracker::new(), - read_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_READERS)), + read_semaphore: Arc::clone(&read_semaphore), + read_deadline: DEFAULT_READ_DEADLINE, }; - let handle = ColdStorageHandle::new(read_sender, write_sender); + let handle = ColdStorageHandle::new(read_sender, read_semaphore, write_sender); (task, handle) } + /// Override the per-request deadline for non-streaming reads. + /// + /// Defaults to 5 seconds. The deadline is a guardrail against + /// backend-layer pathology (stuck connection, runaway query); + /// legitimate work should complete well within it. + pub const fn with_read_deadline(mut self, deadline: Duration) -> Self { + self.read_deadline = deadline; + self + } + /// Spawn the task and return the handle. /// - /// The task will run until the cancellation token is triggered or the - /// channels are closed. + /// The task will run until the cancellation token is triggered or + /// the channels are closed. pub fn spawn(backend: B, cancel_token: CancellationToken) -> ColdStorageHandle { let (task, handle) = Self::new(backend, cancel_token); tokio::spawn(task.run()); handle } - /// Handle a write request using the exclusively owned write backend. - /// - /// Called inline in the run loop (never spawned). The write backend - /// is not shared — no lock acquisition needed. The drain-before-write - /// step in `run()` ensures no read tasks are populating the cache - /// concurrently, preventing stale cache entries after truncation. - async fn handle_write(&mut self, req: ColdWriteRequest) { - match req { - ColdWriteRequest::AppendBlock(boxed) => { - let result = self.write_backend.append_block(boxed.data).await; - let _ = boxed.resp.send(result); - } - ColdWriteRequest::AppendBlocks { data, resp } => { - let result = self.write_backend.append_blocks(data).await; - let _ = resp.send(result); - } - ColdWriteRequest::TruncateAbove { block, resp } => { - let result = self.write_backend.truncate_above(block).await; - if result.is_ok() { - self.inner.cache.lock().await.invalidate_above(block); - } - let _ = resp.send(result); - } - ColdWriteRequest::DrainAbove { block, resp } => { - let result = self.write_backend.drain_above(block).await; - if result.is_ok() { - self.inner.cache.lock().await.invalidate_above(block); - } - let _ = resp.send(result); - } - } - } - /// Run the task, processing requests until shutdown. #[instrument(skip(self), name = "cold_storage_task")] - pub async fn run(mut self) { + pub async fn run(self) { debug!("Cold storage task started"); - loop { - tokio::select! { - biased; + let Self { + inner, + write_backend, + read_receiver, + write_receiver, + cancel_token, + task_tracker, + read_semaphore, + read_deadline, + } = self; + + let dispatcher = run_dispatcher( + read_receiver, + Arc::clone(&inner), + task_tracker.clone(), + cancel_token.clone(), + read_deadline, + ); + let writer = run_writer( + write_receiver, + write_backend, + Arc::clone(&inner), + read_semaphore, + cancel_token, + ); + + tokio::join!(dispatcher, writer); + + // Graceful shutdown: drain reads first (short-lived, bounded + // by `read_deadline`), then streams (bounded by per-stream + // deadline). Reads must drain first because StreamLogs + // handlers can spawn stream producers — draining streams + // before reads could miss a newly-spawned producer. + debug!("Waiting for in-progress read handlers to complete"); + task_tracker.close(); + task_tracker.wait().await; + debug!("Waiting for in-progress stream producers to complete"); + inner.stream_tracker.close(); + inner.stream_tracker.wait().await; + debug!("Cold storage task shut down gracefully"); + } +} + +/// Dispatcher subtask: pulls [`PermittedReadRequest`]s and spawns a +/// handler for each, wrapping the work in the read deadline. +/// +/// Runs concurrently with the writer so that permits attached to +/// queued messages always have a consumer, preventing the drain from +/// stranding permits in the channel. +async fn run_dispatcher( + mut read_receiver: mpsc::Receiver, + inner: Arc>, + task_tracker: TaskTracker, + cancel_token: CancellationToken, + deadline: Duration, +) { + loop { + tokio::select! { + biased; + _ = cancel_token.cancelled() => { + debug!("Read dispatcher received cancellation signal"); + break; + } + maybe = read_receiver.recv() => { + let Some(PermittedReadRequest { permit, req }) = maybe else { + debug!("Cold storage read channel closed"); + break; + }; + let inner = Arc::clone(&inner); + task_tracker.spawn(async move { + // The permit is released when this future is dropped, + // on normal completion, panic, or deadline expiry. + let _permit = permit; + inner.handle_read_req(req, deadline).await; + }); + } + } + } +} - _ = self.cancel_token.cancelled() => { - debug!("Cold storage task received cancellation signal"); +/// Writer subtask: consumes writes sequentially, draining all +/// in-flight readers before each write via +/// [`Semaphore::acquire_many_owned`]. +/// +/// The drain is wrapped in a cancel-select so that a stuck reader +/// holding its permit cannot wedge shutdown. Pulling a write from the +/// channel and then exiting on cancel drops the write's oneshot +/// sender; the caller sees `RecvError` mapped to `Cancelled`. +async fn run_writer( + mut write_receiver: mpsc::Receiver, + mut write_backend: B, + inner: Arc>, + read_semaphore: Arc, + cancel_token: CancellationToken, +) { + loop { + tokio::select! { + biased; + _ = cancel_token.cancelled() => { + debug!("Writer received cancellation signal"); + break; + } + maybe = write_receiver.recv() => { + let Some(req) = maybe else { + debug!("Cold storage write channel closed"); break; - } + }; - maybe_write = self.write_receiver.recv() => { - let Some(req) = maybe_write else { - debug!("Cold storage write channel closed"); + // Drain in-flight readers by acquiring every permit. Only + // completes once no reader holds a permit, which means no + // reader is touching the backend. + // + // `acquire_many_owned` only errors if the semaphore is + // closed; the semaphore lives for the lifetime of this + // task, so the error is unreachable. + let _drain = tokio::select! { + _ = cancel_token.cancelled() => { + debug!("Cancellation while waiting for read drain"); break; - }; - // Drain in-flight read tasks before executing the write by - // acquiring every read permit. This blocks until all - // in-flight readers have released their permits, giving the - // write exclusive backend access. Stream producers are NOT - // drained here — they rely on backend-level read isolation - // (snapshot semantics). - // - // `acquire_many_owned` only errors if the semaphore is - // closed; the semaphore lives for the lifetime of the run - // loop, so this is infallible here. - let _drain = self - .read_semaphore + } + d = read_semaphore .clone() - .acquire_many_owned(MAX_CONCURRENT_READERS as u32) - .await - .expect("read semaphore outlives the run loop"); - - self.handle_write(req).await; - // `_drain` drops here, restoring all permits. - } + .acquire_many_owned(MAX_CONCURRENT_READERS as u32) => + { + d.expect("read semaphore outlives the writer task") + } + }; - maybe_read = self.read_receiver.recv() => { - let Some(req) = maybe_read else { - debug!("Cold storage read channel closed"); - break; - }; - - // Apply backpressure: acquire a permit before spawning. - // When the semaphore is saturated (64 in-flight readers, or - // a write holding all permits to drain), this waits until - // a permit becomes available. Cancel-safe: a cancellation - // signal exits the run loop without spawning. - let permit = tokio::select! { - _ = self.cancel_token.cancelled() => { - debug!("Cancellation while waiting for read permit"); - break; - } - permit = self.read_semaphore.clone().acquire_owned() => { - permit.expect("read semaphore outlives the run loop") - } - }; - - let inner = Arc::clone(&self.inner); - self.task_tracker.spawn(async move { - // Hold the permit for the lifetime of the handler — - // it is released on completion or panic. - let _permit = permit; - inner.handle_read(req).await; - }); - } + handle_write(&mut write_backend, &inner, req).await; + // `_drain` drops here, restoring all permits. } } + } +} - // Graceful shutdown: drain reads first (short-lived), then streams - // (bounded by deadline). Reads must drain first because StreamLogs - // requests spawn stream tasks — draining streams before reads could - // miss newly spawned producers. - debug!("Waiting for in-progress read handlers to complete"); - self.task_tracker.close(); - self.task_tracker.wait().await; - debug!("Waiting for in-progress stream producers to complete"); - self.inner.stream_tracker.close(); - self.inner.stream_tracker.wait().await; - debug!("Cold storage task shut down gracefully"); +/// Execute a single write request against the exclusively owned +/// write backend. Invalidates cache entries after destructive +/// operations. +/// +/// The caller must have drained all in-flight readers before calling +/// this. No synchronization is performed here. +async fn handle_write( + backend: &mut B, + inner: &Arc>, + req: ColdWriteRequest, +) { + match req { + ColdWriteRequest::AppendBlock(boxed) => { + let result = backend.append_block(boxed.data).await; + let _ = boxed.resp.send(result); + } + ColdWriteRequest::AppendBlocks { data, resp } => { + let result = backend.append_blocks(data).await; + let _ = resp.send(result); + } + ColdWriteRequest::TruncateAbove { block, resp } => { + let result = backend.truncate_above(block).await; + if result.is_ok() { + inner.cache.lock().await.invalidate_above(block); + } + let _ = resp.send(result); + } + ColdWriteRequest::DrainAbove { block, resp } => { + let result = backend.drain_above(block).await; + if result.is_ok() { + inner.cache.lock().await.invalidate_above(block); + } + let _ = resp.send(result); + } } } diff --git a/crates/cold/tests/concurrency.rs b/crates/cold/tests/concurrency.rs index 9e5d950..a6a5817 100644 --- a/crates/cold/tests/concurrency.rs +++ b/crates/cold/tests/concurrency.rs @@ -2,16 +2,33 @@ //! //! These tests exercise the read/write concurrency machinery in //! [`signet_cold::ColdStorageTask`] directly, independent of any particular -//! backend. They use the in-memory backend as a fast, deterministic fixture. +//! backend. They use the in-memory backend (optionally wrapped in a +//! [`GatedBackend`]) as a fast, deterministic fixture. -use alloy::consensus::{Header, Sealable}; -use signet_cold::{BlockData, ColdStorageTask, mem::MemColdBackend}; -use std::time::Duration; -use tokio::time::timeout; +use alloy::{ + consensus::{Header, Sealable}, + primitives::BlockNumber, +}; +use signet_cold::{ + BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead, + ColdStorageTask, ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, + RpcLog, SignetEventsSpecifier, StreamParams, TransactionSpecifier, ZenithHeaderSpecifier, + mem::MemColdBackend, produce_log_stream_default, +}; +use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader}; +use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, +}; +use tokio::{sync::Semaphore, time::timeout}; use tokio_util::sync::CancellationToken; -/// Upper bound for the whole test. Far larger than any correct execution; -/// small enough that a deadlock regression trips the timeout quickly. +/// Upper bound for the whole test. Far larger than any correct +/// execution; small enough that a deadlock regression trips the +/// timeout quickly. const DEADLOCK_GUARD: Duration = Duration::from_secs(15); fn block(n: u64) -> BlockData { @@ -19,6 +36,143 @@ fn block(n: u64) -> BlockData { BlockData::new(header.seal_slow(), vec![], vec![], vec![], None) } +/// Test wrapper that gates every read call on a test-owned semaphore. +/// +/// Each read method acquires a permit on [`GatedBackend::gate`] before +/// delegating to the inner backend. Tests control permit availability +/// to hold readers in a specific state, then release them on demand. +/// +/// Writes pass through unchanged. `produce_log_stream` is not gated +/// (streaming isn't part of the drain-before-write invariant). +#[derive(Clone)] +struct GatedBackend { + inner: B, + gate: Arc, + /// Count of read calls that have entered the gate (incremented + /// before `acquire`). Lets tests observe how many readers are + /// currently parked waiting for a permit. + reads_entered: Arc, +} + +impl GatedBackend { + fn new(inner: B, initial_permits: usize) -> Self { + Self { + inner, + gate: Arc::new(Semaphore::new(initial_permits)), + reads_entered: Arc::new(AtomicUsize::new(0)), + } + } + + fn gate(&self) -> Arc { + Arc::clone(&self.gate) + } + + async fn acquire(&self) { + self.reads_entered.fetch_add(1, Ordering::SeqCst); + // Permits are returned to the pool when the guard drops at the + // end of the method — we only need to serialize at the gate, + // not keep the permit across the call. + let _ = self.gate.acquire().await.expect("gate never closed in tests"); + } +} + +impl ColdStorageRead for GatedBackend { + async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult> { + self.acquire().await; + self.inner.get_header(spec).await + } + + async fn get_headers( + &self, + specs: Vec, + ) -> ColdResult>> { + self.acquire().await; + self.inner.get_headers(specs).await + } + + async fn get_transaction( + &self, + spec: TransactionSpecifier, + ) -> ColdResult>> { + self.acquire().await; + self.inner.get_transaction(spec).await + } + + async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult> { + self.acquire().await; + self.inner.get_transactions_in_block(block).await + } + + async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult { + self.acquire().await; + self.inner.get_transaction_count(block).await + } + + async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult> { + self.acquire().await; + self.inner.get_receipt(spec).await + } + + async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult> { + self.acquire().await; + self.inner.get_receipts_in_block(block).await + } + + async fn get_signet_events( + &self, + spec: SignetEventsSpecifier, + ) -> ColdResult> { + self.acquire().await; + self.inner.get_signet_events(spec).await + } + + async fn get_zenith_header( + &self, + spec: ZenithHeaderSpecifier, + ) -> ColdResult> { + self.acquire().await; + self.inner.get_zenith_header(spec).await + } + + async fn get_zenith_headers( + &self, + spec: ZenithHeaderSpecifier, + ) -> ColdResult> { + self.acquire().await; + self.inner.get_zenith_headers(spec).await + } + + async fn get_latest_block(&self) -> ColdResult> { + self.acquire().await; + self.inner.get_latest_block().await + } + + async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult> { + self.acquire().await; + self.inner.get_logs(filter, max_logs).await + } + + async fn produce_log_stream(&self, filter: &Filter, params: StreamParams) { + produce_log_stream_default(self, filter, params).await; + } +} + +impl ColdStorageWrite for GatedBackend { + async fn append_block(&mut self, data: BlockData) -> ColdResult<()> { + self.inner.append_block(data).await + } + + async fn append_blocks(&mut self, data: Vec) -> ColdResult<()> { + self.inner.append_blocks(data).await + } + + async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> { + self.inner.truncate_above(block).await + } +} + +impl ColdStorage for GatedBackend {} + /// Regression test for the read-arm backpressure deadlock. /// /// Prior to the semaphore-based backpressure fix, the run loop used @@ -38,14 +192,11 @@ async fn reads_above_concurrency_cap_do_not_deadlock() { let cancel = CancellationToken::new(); let handle = ColdStorageTask::spawn(backend, cancel.clone()); - // Seed a handful of blocks so the reads have something to find. handle.append_blocks((1..=8).map(block).collect()).await.unwrap(); let result = timeout(DEADLOCK_GUARD, async { let reader = handle.reader(); let mut set = tokio::task::JoinSet::new(); - // 4× the 64-reader concurrency cap — enough to reach the previously - // deadlocking path many times over. for i in 0..256u64 { let reader = reader.clone(); set.spawn(async move { reader.get_header_by_number(1 + (i % 8)).await }); @@ -87,9 +238,6 @@ async fn write_after_saturating_reads_makes_progress() { set.spawn(async move { reader.get_header_by_number(1 + (i % 8)).await }); } - // Issue a write while the readers are still queuing. With the fix - // it completes as soon as in-flight readers drain; without it, it - // never dispatches because the run loop is wedged. handle.append_blocks(vec![block(9)]).await.unwrap(); while let Some(res) = set.join_next().await { @@ -102,3 +250,248 @@ async fn write_after_saturating_reads_makes_progress() { cancel.cancel(); } + +/// A write queued behind saturated readers completes before any readers +/// that arrive after it. Relies on tokio's FIFO semaphore fairness. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn fairness_write_serves_before_later_readers() { + let mem = MemColdBackend::new(); + let backend = GatedBackend::new(mem, 0); + let gate = backend.gate(); + let entered = Arc::clone(&backend.reads_entered); + + let cancel = CancellationToken::new(); + let (task, handle) = ColdStorageTask::new(backend, cancel.clone()); + let task_handle = tokio::spawn(task.run()); + + // Seed via a bypass so we don't have to wait on the gate for setup. + // append_blocks is a write — not gated. + handle.append_blocks((1..=8).map(block).collect()).await.unwrap(); + + timeout(DEADLOCK_GUARD, async { + // First wave: 64 readers that will saturate the in-flight cap and + // park on the gate. + let reader = handle.reader(); + let mut first_wave = tokio::task::JoinSet::new(); + for i in 0..64u64 { + let reader = reader.clone(); + first_wave.spawn(async move { reader.get_header_by_number(1 + (i % 8)).await }); + } + + // Wait until all 64 have reached the gate. + while entered.load(Ordering::SeqCst) < 64 { + tokio::task::yield_now().await; + } + + // Writer queues behind the 64 saturated readers (it needs all + // 64 permits). Its completion is ordered before the second wave + // by FIFO fairness on the semaphore. + let write_done = Arc::new(tokio::sync::Notify::new()); + let write_done_tx = Arc::clone(&write_done); + let handle_for_write = handle.clone(); + let write_task = tokio::spawn(async move { + handle_for_write.append_blocks(vec![block(9)]).await.unwrap(); + write_done_tx.notify_one(); + }); + + // Give the writer a moment to enter its drain. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Second wave: 64 readers queued *after* the writer. + let mut second_wave = tokio::task::JoinSet::new(); + for i in 0..64u64 { + let reader = reader.clone(); + second_wave.spawn(async move { reader.get_header_by_number(1 + (i % 8)).await }); + } + + // Release all gate permits. Let every queued reader proceed. + // First-wave readers finish (releasing their cold semaphore + // permits); writer accumulates; writer completes; second-wave + // readers then acquire permits and run. + gate.add_permits(256); + + // Writer must complete before any second-wave reader finishes. + let write_first = tokio::select! { + biased; + _ = write_done.notified() => true, + Some(_) = second_wave.join_next() => false, + }; + assert!(write_first, "second-wave reader completed before queued write"); + + // Drain everything. + write_task.await.expect("write task panicked"); + while first_wave.join_next().await.is_some() {} + while second_wave.join_next().await.is_some() {} + }) + .await + .expect("fairness test timed out"); + + cancel.cancel(); + let _ = timeout(DEADLOCK_GUARD, task_handle).await; +} + +/// Cancellation must cut through reader backpressure: a reader parked +/// waiting for a permit cannot keep the task alive. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn cancel_during_reader_backpressure_shuts_down() { + let mem = MemColdBackend::new(); + let backend = GatedBackend::new(mem, 0); + let entered = Arc::clone(&backend.reads_entered); + + let cancel = CancellationToken::new(); + let (task, handle) = ColdStorageTask::new(backend, cancel.clone()); + let task_handle = tokio::spawn(task.run()); + + handle.append_blocks((1..=4).map(block).collect()).await.unwrap(); + + // Saturate the in-flight cap: all 64 readers park on the gate. + let reader = handle.reader(); + let mut set = tokio::task::JoinSet::new(); + for i in 0..64u64 { + let reader = reader.clone(); + set.spawn(async move { reader.get_header_by_number(1 + (i % 4)).await }); + } + + while entered.load(Ordering::SeqCst) < 64 { + tokio::task::yield_now().await; + } + + // A 65th read queues on the cold semaphore (not yet in the backend). + let reader_65 = reader.clone(); + let waiter = tokio::spawn(async move { reader_65.get_header_by_number(1).await }); + + // Now cancel. + cancel.cancel(); + + // Task must shut down within the deadlock guard. The stuck readers + // are released by the 5s read deadline. + timeout(DEADLOCK_GUARD, task_handle).await.expect("task did not shut down").unwrap(); + + // The 65th read either sees Cancelled or TaskTerminated — both are + // acceptable outcomes of shutdown. + let res = waiter.await.expect("waiter panicked"); + assert!( + matches!(res, Err(ColdStorageError::Cancelled | ColdStorageError::TaskTerminated)), + "unexpected waiter result: {res:?}" + ); + + drop(set); +} + +/// Cancellation must cut through the write drain: a stuck reader +/// cannot block shutdown by holding a permit the writer is waiting on. +/// +/// Without the cancel-select around the writer's `acquire_many_owned`, +/// this test hits [`DEADLOCK_GUARD`]. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn cancel_during_write_drain_shuts_down() { + let mem = MemColdBackend::new(); + let backend = GatedBackend::new(mem, 0); + let entered = Arc::clone(&backend.reads_entered); + + let cancel = CancellationToken::new(); + let (task, handle) = ColdStorageTask::new(backend, cancel.clone()); + let task_handle = tokio::spawn(task.run()); + + handle.append_blocks((1..=4).map(block).collect()).await.unwrap(); + + // Saturate the in-flight cap. + let reader = handle.reader(); + let mut set = tokio::task::JoinSet::new(); + for i in 0..64u64 { + let reader = reader.clone(); + set.spawn(async move { reader.get_header_by_number(1 + (i % 4)).await }); + } + + while entered.load(Ordering::SeqCst) < 64 { + tokio::task::yield_now().await; + } + + // Queue a write. It'll enter the drain and block on acquire_many. + let handle_for_write = handle.clone(); + let write_waiter = + tokio::spawn(async move { handle_for_write.append_blocks(vec![block(5)]).await }); + + // Give the writer a moment to start draining. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Cancel while the writer is blocked inside acquire_many. + cancel.cancel(); + + // Task must shut down within the guard. Without the cancel-select + // around acquire_many_owned, the writer never sees the cancel. + timeout(DEADLOCK_GUARD, task_handle).await.expect("task did not shut down").unwrap(); + + // The aborted write receives Cancelled (its oneshot sender was + // dropped when the writer task exited). + let write_res = write_waiter.await.expect("write waiter panicked"); + assert!( + matches!(write_res, Err(ColdStorageError::Cancelled | ColdStorageError::TaskTerminated)), + "unexpected write result: {write_res:?}" + ); + + drop(set); +} + +/// A read that exceeds the operation deadline returns +/// `ColdStorageError::Timeout` and releases its permit so subsequent +/// reads can proceed. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn operation_deadline_releases_permit() { + let mem = MemColdBackend::new(); + let backend = GatedBackend::new(mem, 0); + let gate = backend.gate(); + let entered = Arc::clone(&backend.reads_entered); + + let cancel = CancellationToken::new(); + let (task, handle) = ColdStorageTask::new(backend, cancel.clone()) + .map_first(|t| t.with_read_deadline(Duration::from_millis(200))); + let task_handle = tokio::spawn(task.run()); + + handle.append_blocks((1..=4).map(block).collect()).await.unwrap(); + + let reader = handle.reader(); + + // First read: hits the gate, times out after 200ms. + let first = tokio::spawn({ + let reader = reader.clone(); + async move { reader.get_header_by_number(1).await } + }); + + while entered.load(Ordering::SeqCst) < 1 { + tokio::task::yield_now().await; + } + + // Wait long enough for the deadline to fire. + let res = timeout(Duration::from_secs(2), first).await.expect("first read stuck"); + let res = res.expect("first read panicked"); + assert!(matches!(res, Err(ColdStorageError::Timeout)), "expected Timeout, got {res:?}"); + + // Second read: the permit from the timed-out first read must be + // back in the pool. Open the gate so this one succeeds. + gate.add_permits(1); + let second = timeout(Duration::from_secs(2), reader.get_header_by_number(1)) + .await + .expect("second read stuck"); + second.expect("second read failed"); + + cancel.cancel(); + let _ = timeout(DEADLOCK_GUARD, task_handle).await; +} + +/// Helper to transform `(Task, Handle)` returned by `ColdStorageTask::new` +/// by applying a function to the task without touching the handle. +trait MapFirst { + fn map_first(self, f: F) -> (A2, B) + where + F: FnOnce(A) -> A2; +} + +impl MapFirst for (A, B) { + fn map_first(self, f: F) -> (A2, B) + where + F: FnOnce(A) -> A2, + { + (f(self.0), self.1) + } +} From 2170812dcb7f2f92f4bcd5028520d3502a82a169 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 17 Apr 2026 00:09:42 -0400 Subject: [PATCH 2/2] test(storage): poll for cold catch-up after dispatch in unified tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `UnifiedStorage::append_blocks` dispatches to cold asynchronously. With the cold task's dispatcher and writer now running on separate subtasks, there is no biased ordering between a fire-and-forget write and a subsequent read — the tests that assumed one were relying on an implementation detail that production code already polls around (see `components/crates/node-tests/src/context.rs`). Add a `wait_for_cold_height` helper matching the production pattern and use it in the two tests that issued a read immediately after `append_blocks`. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/storage/tests/unified.rs | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/crates/storage/tests/unified.rs b/crates/storage/tests/unified.rs index 002157c..1d0f3f8 100644 --- a/crates/storage/tests/unified.rs +++ b/crates/storage/tests/unified.rs @@ -2,17 +2,37 @@ use alloy::{ consensus::{Header, Sealable, Signed, TxLegacy, transaction::Recovered}, - primitives::{Address, B256, Signature, TxKind, U256}, + primitives::{Address, B256, BlockNumber, Signature, TxKind, U256}, }; -use signet_cold::{ColdStorageTask, HeaderSpecifier, mem::MemColdBackend}; +use signet_cold::{ColdStorageReadHandle, ColdStorageTask, HeaderSpecifier, mem::MemColdBackend}; use signet_hot::{HistoryRead, HistoryWrite, HotKv, mem::MemKv, model::HotKvWrite}; use signet_storage::UnifiedStorage; use signet_storage_types::{ ExecutedBlock, ExecutedBlockBuilder, Receipt, RecoveredTx, SealedHeader, TransactionSigned, }; +use std::time::Duration; use tokio_util::sync::CancellationToken; use trevm::revm::database::BundleState; +/// Poll cold storage until its latest block reaches `target`, or the +/// deadline elapses. `UnifiedStorage::append_blocks` dispatches to +/// cold asynchronously, so callers that need deterministic reads must +/// wait before querying. See +/// `components/crates/node-tests/src/context.rs` for the production +/// version of this pattern. +async fn wait_for_cold_height(cold: &ColdStorageReadHandle, target: BlockNumber) { + tokio::time::timeout(Duration::from_secs(5), async { + loop { + match cold.get_latest_block().await { + Ok(Some(latest)) if latest >= target => break, + _ => tokio::task::yield_now().await, + } + } + }) + .await + .unwrap_or_else(|_| panic!("cold storage did not reach height {target} within 5s")); +} + /// Build a chain of blocks with valid parent hash linkage. fn make_chain(count: u64) -> Vec { let mut blocks = Vec::with_capacity(count as usize); @@ -93,7 +113,9 @@ async fn append_and_read_back() { let tip = reader.get_chain_tip().unwrap(); assert_eq!(tip, Some((0, expected_hash))); - // Verify cold storage has the header + // Wait for cold to absorb the dispatched write, then verify cold + // storage has the header. + wait_for_cold_height(&cold_handle.reader(), 0).await; let header = cold_handle.get_header(HeaderSpecifier::Number(0)).await.unwrap(); assert!(header.is_some()); @@ -156,8 +178,9 @@ async fn drain_above_empty_when_at_tip() { let cold_handle = ColdStorageTask::spawn(MemColdBackend::new(), cancel.clone()); let storage = UnifiedStorage::new(hot.clone(), cold_handle); - // Append 2 blocks (0, 1) + // Append 2 blocks (0, 1) and wait for cold to catch up. storage.append_blocks(make_chain_with_txs(2, 1)).unwrap(); + wait_for_cold_height(&storage.cold().reader(), 1).await; // drain_above(1) — nothing above tip let drained = storage.drain_above(1).await.unwrap();