From f8e1da8f8ccafc408bf81986dc7b8ac5ed43f25e Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 16 Jun 2026 10:54:52 +0200 Subject: [PATCH 1/4] persist: deasync shard_source operators Convert shard_source_descs and shard_source_fetch from AsyncOperatorBuilder to synchronous OperatorBuilderRc operators paired with tokio tasks that own the persist I/O (reader/snapshot/listen and batch fetching). This drops the persist source's dependence on the timely async bridge with no behavior change. * shard_source_descs runs a listen task on the chosen worker that sends parts (split into ExchangeableBatchPart + Lease) and progress over a channel; the operator downgrades capabilities and parks leases. The former shard_source_descs_return operator is merged in as a disconnected completed_fetches input, and the listen handle (the reader's SeqNo hold) is released via a oneshot once that frontier empties. * shard_source_fetch forwards descs to a fetch task and retains a per-flight capability pair; results are matched FIFO, emitted at the data capability, with the completed-fetches capability dropped to release the lease. On a missing blob it reports through the ErrorHandler and freezes, retaining capabilities (and crucially ceasing to drain results, so a later good result cannot advance the frontier past the missing part). * Both operators reproduce builder_async's two-phase shutdown via build_reschedule + the coordinated button, so a local-only press cannot advance the downstream frontier past times other workers still feed. Adds ErrorHandler::report_and_freeze, module documentation of the consumer contract and the operator/task architecture, and regression tests for end-to-end fetch, mid-stream shutdown, listing-path error freeze, and fetch-path error freeze. Co-Authored-By: Claude Opus 4.8 --- .../src/operators/shard_source.rs | 1373 +++++++++++++---- 1 file changed, 1061 insertions(+), 312 deletions(-) diff --git a/src/persist-client/src/operators/shard_source.rs b/src/persist-client/src/operators/shard_source.rs index ac4cbfa165be4..5efa2680f0ae6 100644 --- a/src/persist-client/src/operators/shard_source.rs +++ b/src/persist-client/src/operators/shard_source.rs @@ -8,10 +8,117 @@ // by the Apache License, Version 2.0. //! A source that reads from a persist shard. +//! +//! # For consumers +//! +//! [shard_source] renders a set of timely operators that continuously read a +//! persist shard and emit its contents as a stream of [FetchedBlob]s: parts +//! that have been downloaded from blob storage but not yet decoded. Callers +//! decode them (e.g. via [FetchedBlob::parse]) downstream, typically on the +//! same worker, so that decode cost scales with the number of workers. +//! +//! The source observes the following contract: +//! +//! * **Times**: all emitted times are advanced by the given `as_of`. With +//! [SnapshotMode::Include] the shard contents as of `as_of` are emitted at +//! the `as_of`; with [SnapshotMode::Exclude] only subsequent updates are. +//! * **Frontier**: the output frontier tracks the shard's `upper`, eagerly +//! downgraded to the `as_of` before the snapshot is available so downstream +//! consumers (e.g. `persist_sink`) can rely on close frontier tracking. When +//! `until` is non-empty, parts lying entirely at or beyond `until` are +//! dropped and the source eventually completes; fine-grained filtering of +//! individual updates against `until` is the caller's responsibility. +//! * **Distribution**: parts are distributed across all workers, regardless of +//! which worker coordinates the read. +//! * **Filter pushdown**: `filter_fn` is consulted with each part's stats and +//! may keep the part, discard it without fetching, or replace its contents +//! with a single-row constant ([FilterResult]). A random sample of discarded +//! parts is fetched anyway to audit the decision. +//! * **Errors**: conditions that are neither data-plane errors nor bugs (an +//! unserveable `as_of`, a missing blob after a lease timeout) are reported to +//! the given [ErrorHandler]. The source then freezes: it stops doing work but +//! retains its capabilities, so the frontier never advances past unproduced +//! data while a halt or dataflow restart is pending. +//! * **Shutdown**: dropping the returned [PressOnDropButton] tokens shuts the +//! source down, dropping all held capabilities and abandoning in-flight work. +//! +//! # For implementors +//! +//! The source is split into two synchronous timely operators, each paired with +//! a tokio task that owns all async persist work. Operators and tasks +//! communicate over channels; tasks wake their operator through a +//! [SyncActivator]-backed [ArcActivator], which also unparks a parked worker. +//! +//! ```text +//! tokio: listen task ──(parts+leases, progress, mpsc)──> [shard_source_descs] +//! (chosen worker only) │ exchange by assigned worker +//! ▲ oneshot: drop listen ▼ +//! └──────────────────────────────────────────── [shard_source_fetch] (per worker) +//! completed_fetches feedback │ ▲ +//! desc │ │ FetchedBlob +//! (mpsc) ▼ │ (mpsc) +//! tokio: fetch task (per worker) +//! ``` +//! +//! **`shard_source_descs`** runs on all workers, but only the chosen worker +//! (hash of the name) spawns a listen task and holds capabilities. The task +//! opens a leased reader, waits for the `start_signal`, resolves the `as_of`, +//! and walks snapshot + listen, applying `filter_fn` and the audit budget. It +//! splits each [LeasedBatchPart] into an [ExchangeableBatchPart] plus its +//! [Lease] and sends both to the operator, along with progress updates that +//! drive the operator's capability downgrades. The operator emits +//! `(worker_idx, part)` pairs — exchanged by index — and parks each lease in a +//! `LeaseManager` keyed by the part's time. The lease-return input (formerly +//! the separate `shard_source_descs_return` operator) is merged in as a +//! disconnected `completed_fetches` input. +//! +//! **`shard_source_fetch`** forwards each incoming desc to its fetch task and +//! retains a *pair* of capabilities for it: one for the data output, one for +//! the `completed_fetches` output. The task downloads parts in order; the +//! operator matches results to capabilities FIFO, emits the [FetchedBlob] at +//! the first capability, and drops the second. +//! +//! **Lease lifecycle**: dropping the completed-fetches capability advances a +//! feedback loop back into `shard_source_descs`, whose frontier advances the +//! `LeaseManager` and drops the leases for fetched parts. The listen task — and +//! with it the reader and its SeqNo hold, which is what actually protects +//! unfetched parts' blobs from GC — must outlive all in-flight fetches: the +//! operator releases it through a oneshot only once the completed-fetches +//! frontier is empty. If the dataflow is dropped instead, the tasks are aborted +//! via their [AbortOnDropHandle]s. +//! +//! **Two-phase shutdown**: both operators return a [PressOnDropButton] and so +//! participate in `builder_async`'s coordinated shutdown. Their schedule +//! closures use `build_reschedule` and only drop capabilities / drain inputs +//! once *all* workers have pressed (`all_pressed`). Dropping capabilities on a +//! local-only press would let the downstream frontier advance past times that +//! other workers' instances still feed (cross-worker teardown skew). +//! +//! Subtleties worth knowing before changing this code: +//! +//! * [LeasedBatchPart]s panic on drop while leased; only the lease-split +//! [ExchangeableBatchPart] representation may cross channels, where it can be +//! dropped harmlessly on shutdown. +//! * The fetch task processes descs strictly in order; the FIFO capability +//! matching in the fetch operator is only sound because of this. Once a fetch +//! fails the operator freezes and must STOP draining results — a later, good +//! result would otherwise pop the failed part's capability and advance the +//! frontier past data never emitted. +//! * The `completed_fetches` feedback edge carries no data (`Infallible`); it +//! exists for its frontier, which signals cross-process fetch completion, +//! wakes the descs operator, and keeps that operator alive (and thus the +//! listen task's SeqNo hold) until all fetches finish. +//! * Tests that step workers manually must not park indefinitely while polling +//! state that does not activate the worker: with the listen in a task, a +//! caught-up source with no listen retry timer produces no activations. +//! +//! [SyncActivator]: timely::scheduling::SyncActivator +//! [ArcActivator]: mz_timely_util::activator::ArcActivator +//! [LeasedBatchPart]: crate::fetch::LeasedBatchPart +//! [AbortOnDropHandle]: mz_ore::task::AbortOnDropHandle -use std::cell::RefCell; -use std::collections::BTreeMap; use std::collections::hash_map::DefaultHasher; +use std::collections::{BTreeMap, VecDeque}; use std::convert::Infallible; use std::fmt::{Debug, Formatter}; use std::future::Future; @@ -28,25 +135,26 @@ use differential_dataflow::difference::Monoid; use differential_dataflow::lattice::Lattice; use futures_util::StreamExt; use mz_ore::cast::CastFrom; -use mz_ore::collections::CollectionExt; use mz_persist_types::stats::PartStats; use mz_persist_types::{Codec, Codec64}; -use mz_timely_util::builder_async::{ - Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, -}; +use mz_timely_util::activator::ArcActivator; +use mz_timely_util::builder_async::{PressOnDropButton, button}; use timely::PartialOrder; -use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::{Exchange, Pipeline}; -use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Enter, Feedback, Leave}; +use timely::dataflow::operators::generic::OutputBuilder; +use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc; +use timely::dataflow::operators::{Capability, CapabilitySet, ConnectLoop, Enter, Feedback, Leave}; use timely::dataflow::{Scope, StreamVec}; use timely::order::TotalOrder; use timely::progress::frontier::AntichainRef; use timely::progress::{Antichain, Timestamp, timestamp::Refines}; +use tokio::sync::mpsc::error::TryRecvError; use tracing::{debug, trace}; use crate::batch::BLOB_TARGET_SIZE; use crate::cfg::{RetryParameters, USE_CRITICAL_SINCE_SOURCE}; use crate::fetch::{ExchangeableBatchPart, FetchedBlob, Lease}; +use crate::internal::paths::BlobKey; use crate::internal::state::BatchPart; use crate::stats::{STATS_AUDIT_PERCENT, STATS_FILTER_ENABLED}; use crate::{Diagnostics, PersistClient, ShardId}; @@ -109,6 +217,19 @@ impl ErrorHandler { Self::Signal(Rc::new(signal_fn)) } + /// Signal an error to an error handler from a synchronous operator. For [ErrorHandler::Halt] + /// this never returns; for [ErrorHandler::Signal] it returns after invoking the callback, and + /// the caller is responsible for "freezing": retaining its capabilities and doing no further + /// work, so that no spurious progress is observable while a restart is pending. + pub fn report_and_freeze(&self, error: anyhow::Error) { + match self { + ErrorHandler::Halt(name) => { + mz_ore::halt!("unhandled error in {name}: {error:#}") + } + ErrorHandler::Signal(callback) => callback(error), + } + } + /// Signal an error to an error handler. This function never returns: logically it blocks until /// restart, though that restart might be sooner (if halting) or later (if triggering a dataflow /// restart, for example). @@ -150,10 +271,10 @@ pub fn shard_source<'inner, 'outer, K, V, T, D, DT, TOuter, C>( desc_transformer: Option
, key_schema: Arc, val_schema: Arc, - filter_fn: impl FnMut(&PartStats, AntichainRef) -> FilterResult + 'static, + filter_fn: impl FnMut(&PartStats, AntichainRef) -> FilterResult + Send + 'static, // If Some, an override for the default listen sleep retry parameters. - listen_sleep: Option RetryParameters + 'static>, - start_signal: impl Future + 'static, + listen_sleep: Option RetryParameters + Send + 'static>, + start_signal: impl Future + Send + 'static, error_handler: ErrorHandler, ) -> ( StreamVec<'inner, T, FetchedBlob>, @@ -180,16 +301,11 @@ where // [`LeasedBatchPart`] and [`Subscribe`] or will likely run into intentional // panics. // - // This source is split as such: - // 1. Sets up `async_stream`, which only yields data (parts) on one chosen - // worker. Generating also generates SeqNo leases on the chosen worker, - // ensuring `part`s do not get GCed while in flight. - // 2. Part distribution: A timely source operator which continuously reads - // from that stream, and distributes the data among workers. - // 3. Part fetcher: A timely operator which downloads the part's contents - // from S3, and outputs them to a timely stream. Additionally, the - // operator returns the `LeasedBatchPart` to the original worker, so it - // can release the SeqNo lease. + // See the module documentation for the structure of this source: a descs + // operator (with a listen task minting parts and leases on one chosen + // worker) distributing parts to a fetch operator (with a fetch task + // downloading their contents) on each worker, with a feedback loop of + // completed fetches that releases the leases. let chosen_worker = usize::cast_from(name.hashed()) % scope.peers(); @@ -291,6 +407,22 @@ impl LeaseManager { } } +/// A message from the listen task to the [shard_source_descs] operator. +enum ListenMessage { + /// The resolved `as_of`; the operator downgrades its capabilities to it. + AsOf(Antichain), + /// Parts minted at `ts`, each with the lease that protects it from GC. + Parts { + ts: T, + parts: Vec<(usize, ExchangeableBatchPart, Lease)>, + }, + /// Listen progress; the operator downgrades its capabilities. The empty + /// antichain indicates the listen is complete (`until` was reached). + Progress(Antichain), + /// A fatal error; the operator reports it and freezes. + Error(anyhow::Error), +} + pub(crate) fn shard_source_descs<'outer, K, V, D, TOuter>( scope: Scope<'outer, TOuter>, name: &str, @@ -303,10 +435,10 @@ pub(crate) fn shard_source_descs<'outer, K, V, D, TOuter>( chosen_worker: usize, key_schema: Arc, val_schema: Arc, - mut filter_fn: impl FnMut(&PartStats, AntichainRef) -> FilterResult + 'static, + mut filter_fn: impl FnMut(&PartStats, AntichainRef) -> FilterResult + Send + 'static, // If Some, an override for the default listen sleep retry parameters. - listen_sleep: Option RetryParameters + 'static>, - start_signal: impl Future + 'static, + listen_sleep: Option RetryParameters + Send + 'static>, + start_signal: impl Future + Send + 'static, error_handler: ErrorHandler, ) -> ( StreamVec<'outer, TOuter, (usize, ExchangeableBatchPart)>, @@ -321,74 +453,53 @@ where { let worker_index = scope.index(); let num_workers = scope.peers(); - - // This is a generator that sets up an async `Stream` that can be continuously polled to get the - // values that are `yield`-ed from it's body. let name_owned = name.to_owned(); - // Create a shared slot between the operator to store the listen handle - let listen_handle = Rc::new(RefCell::new(None)); - let return_listen_handle = Rc::clone(&listen_handle); - - // Create a oneshot channel to give the part returner a SubscriptionLeaseReturner - let (tx, rx) = tokio::sync::oneshot::channel::>>>(); - let mut builder = AsyncOperatorBuilder::new( - format!("shard_source_descs_return({})", name), - scope.clone(), - ); - let mut completed_fetches = builder.new_disconnected_input(completed_fetches_stream, Pipeline); - // This operator doesn't need to use a token because it naturally exits when its input - // frontier reaches the empty antichain. - builder.build(move |_caps| async move { - let Ok(leases) = rx.await else { - // Either we're not the chosen worker or the dataflow was shutdown before the - // subscriber was even created. - return; - }; - while let Some(event) = completed_fetches.next().await { - let Event::Progress(frontier) = event else { - continue; - }; - leases.borrow_mut().advance_to(frontier.borrow()); - } - // Make it explicit that the subscriber is kept alive until we have finished returning parts - drop(return_listen_handle); - }); - let mut builder = - AsyncOperatorBuilder::new(format!("shard_source_descs({})", name), scope.clone()); - let (descs_output, descs_stream) = builder.new_output::>(); - - #[allow(clippy::await_holding_refcell_ref)] - let shutdown_button = builder.build(move |caps| async move { - let mut cap_set = CapabilitySet::from_elem(caps.into_element()); - - // Only one worker is responsible for distributing parts - if worker_index != chosen_worker { - trace!( - "We are not the chosen worker ({}), exiting...", - chosen_worker - ); - return; - } - - // Internally, the `open_leased_reader` call registers a new LeasedReaderId and then fires - // up a background tokio task to heartbeat it. It is possible that we might get a - // particularly adversarial scheduling where the CRDB query to register the id is sent and - // then our Future is not polled again for a long time, resulting is us never spawning the - // heartbeat task. Run reader creation in a task to attempt to defend against this. - // - // TODO: Really we likely need to swap the inners of all persist operators to be - // communicating with a tokio task over a channel, but that's much much harder, so for now - // we whack the moles as we see them. - let mut read = mz_ore::task::spawn(|| format!("shard_source_reader({})", name_owned), { - let diagnostics = Diagnostics { - handle_purpose: format!("shard_source({})", name_owned), - shard_name: name_owned.clone(), + OperatorBuilderRc::new(format!("shard_source_descs({})", name), scope.clone()); + let info = builder.operator_info(); + // NB: create the output before the input, so that the input's explicit + // empty connection below disconnects it from the right output port. + let (descs_output, descs_stream) = + builder.new_output::)>>(); + let mut descs_output = OutputBuilder::from(descs_output); + // The completed fetches input is disconnected from the output: it only + // drives lease returns (merging in what used to be the separate + // `shard_source_descs_return` operator), not output progress. + let mut completed_fetches = + builder.new_input_connection(completed_fetches_stream, Pipeline, []); + + // Only the chosen worker produces parts. It spawns a listen task that owns + // all async work (reader, snapshot, listen loop, stats-based filtering) and + // communicates with the operator over a channel. The other workers build + // the same operator shape but hold no capabilities. + let chosen_state = (worker_index == chosen_worker).then(|| { + let (msg_tx, msg_rx) = tokio::sync::mpsc::unbounded_channel::>(); + // Fired by the operator once the completed-fetches frontier is empty, + // i.e. all fetches are done. The task holds the listen handle (and with + // it the reader's seqno hold) until then. + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let (activator, activation_ack) = ArcActivator::new(scope.clone(), &info); + + let task = mz_ore::task::spawn(|| format!("shard_source_descs({})", name_owned), { + let name = name_owned.clone(); + // Report a fatal error to the operator and stop the task. + let error = |e: anyhow::Error, + msg_tx: &tokio::sync::mpsc::UnboundedSender>, + activator: &ArcActivator| { + let _ = msg_tx.send(ListenMessage::Error(e)); + activator.activate(); }; async move { + // Internally, the `open_leased_reader` call registers a new LeasedReaderId and + // then fires up a background tokio task to heartbeat it. Since we are already + // running inside a task here, the heartbeat task is spawned promptly. let client = client.await; - client + let diagnostics = Diagnostics { + handle_purpose: format!("shard_source({})", name), + shard_name: name.clone(), + }; + let mut read = client .open_leased_reader::( shard_id, key_schema, @@ -397,184 +508,318 @@ where USE_CRITICAL_SINCE_SOURCE.get(client.dyncfgs()), ) .await - } - }) - .await - .expect("could not open persist shard"); - - // Wait for the start signal only after we have obtained a read handle. This makes "cannot - // serve requested as_of" panics caused by (database-issues#8729) significantly less - // likely. - let () = start_signal.await; - - let cfg = read.cfg.clone(); - let metrics = Arc::clone(&read.metrics); - - let as_of = as_of.unwrap_or_else(|| read.since().clone()); - - // Eagerly downgrade our frontier to the initial as_of. This makes sure - // that the output frontier of the `persist_source` closely tracks the - // `upper` frontier of the persist shard. It might be that the snapshot - // for `as_of` is not initially available yet, but this makes sure we - // already downgrade to it. - // - // Downstream consumers might rely on close frontier tracking for making - // progress. For example, the `persist_sink` needs to know the - // up-to-date upper of the output shard to make progress because it will - // only write out new data once it knows that earlier writes went - // through, including the initial downgrade of the shard upper to the - // `as_of`. - // - // NOTE: We have to do this before our `snapshot()` call because that - // will block when there is no data yet available in the shard. - cap_set.downgrade(as_of.clone()); - - let mut snapshot_parts = - match snapshot_mode { - SnapshotMode::Include => match read.snapshot(as_of.clone()).await { - Ok(parts) => parts, - Err(e) => error_handler - .report_and_stop(anyhow!( - "{name_owned}: {shard_id} cannot serve requested as_of {as_of:?}: {e:?}" - )) - .await, - }, - SnapshotMode::Exclude => vec![], - }; - - // We're about to start producing parts to be fetched whose leases will be returned by the - // `shard_source_descs_return` operator above. In order for that operator to successfully - // return the leases we send it the lease returner associated with our shared subscriber. - let leases = Rc::new(RefCell::new(LeaseManager::new())); - tx.send(Rc::clone(&leases)) - .expect("lease returner exited before desc producer"); - - // Store the listen handle in the shared slot so that it stays alive until both operators - // exit - let mut listen = listen_handle.borrow_mut(); - let listen = match read.listen(as_of.clone()).await { - Ok(handle) => listen.insert(handle), - Err(e) => { - error_handler - .report_and_stop(anyhow!( - "{name_owned}: {shard_id} cannot serve requested as_of {as_of:?}: {e:?}" - )) - .await - } - }; + .expect("could not open persist shard"); - let listen_retry = listen_sleep.as_ref().map(|retry| retry()); + // Wait for the start signal only after we have obtained a read handle. This + // makes "cannot serve requested as_of" panics caused by (database-issues#8729) + // significantly less likely. + let () = start_signal.await; - // The head of the stream is enriched with the snapshot parts if they exist - let listen_head = if !snapshot_parts.is_empty() { - let (mut parts, progress) = listen.next(listen_retry).await; - snapshot_parts.append(&mut parts); - futures::stream::iter(Some((snapshot_parts, progress))) - } else { - futures::stream::iter(None) - }; + let cfg = read.cfg.clone(); + let metrics = Arc::clone(&read.metrics); - // The tail of the stream is all subsequent parts - let listen_tail = futures::stream::unfold(listen, |listen| async move { - Some((listen.next(listen_retry).await, listen)) - }); + let as_of = as_of.unwrap_or_else(|| read.since().clone()); - let mut shard_stream = pin!(listen_head.chain(listen_tail)); - - // Ideally, we'd like our audit overhead to be proportional to the actual amount of "real" - // work we're doing in the source. So: start with a small, constant budget; add to the - // budget when we do real work; and skip auditing a part if we don't have the budget for it. - let mut audit_budget_bytes = u64::cast_from(BLOB_TARGET_SIZE.get(&cfg).saturating_mul(2)); - - // All future updates will be timestamped after this frontier. - let mut current_frontier = as_of.clone(); - - // If `until.less_equal(current_frontier)`, it means that all subsequent batches will contain only - // times greater or equal to `until`, which means they can be dropped in their entirety. - while !PartialOrder::less_equal(&until, ¤t_frontier) { - let (parts, progress) = shard_stream.next().await.expect("infinite stream"); - - // Emit the part at the `(ts, 0)` time. The `granular_backpressure` - // operator will refine this further, if its enabled. - let current_ts = current_frontier - .as_option() - .expect("until should always be <= the empty frontier"); - let session_cap = cap_set.delayed(current_ts); - - for mut part_desc in parts { - // TODO: Push more of this logic into LeasedBatchPart like we've - // done for project? - if STATS_FILTER_ENABLED.get(&cfg) { - let filter_result = match &part_desc.part { - BatchPart::Hollow(x) => { - let should_fetch = - x.stats.as_ref().map_or(FilterResult::Keep, |stats| { - filter_fn(&stats.decode(), current_frontier.borrow()) - }); - should_fetch - } - BatchPart::Inline { .. } => FilterResult::Keep, - }; - // Apply the filter: discard or substitute the part if required. - let bytes = u64::cast_from(part_desc.encoded_size_bytes()); - match filter_result { - FilterResult::Keep => { - audit_budget_bytes = audit_budget_bytes.saturating_add(bytes); + // Eagerly downgrade our frontier to the initial as_of. This makes sure + // that the output frontier of the `persist_source` closely tracks the + // `upper` frontier of the persist shard. It might be that the snapshot + // for `as_of` is not initially available yet, but this makes sure we + // already downgrade to it. + // + // Downstream consumers might rely on close frontier tracking for making + // progress. For example, the `persist_sink` needs to know the + // up-to-date upper of the output shard to make progress because it will + // only write out new data once it knows that earlier writes went + // through, including the initial downgrade of the shard upper to the + // `as_of`. + // + // NOTE: We have to do this before our `snapshot()` call because that + // will block when there is no data yet available in the shard. + if msg_tx.send(ListenMessage::AsOf(as_of.clone())).is_err() { + return; + } + activator.activate(); + + let mut snapshot_parts = match snapshot_mode { + SnapshotMode::Include => match read.snapshot(as_of.clone()).await { + Ok(parts) => parts, + Err(e) => { + error( + anyhow!( + "{name}: {shard_id} cannot serve requested as_of {as_of:?}: {e:?}" + ), + &msg_tx, + &activator, + ); + return; } - FilterResult::Discard => { - metrics.pushdown.parts_filtered_count.inc(); - metrics.pushdown.parts_filtered_bytes.inc_by(bytes); - let should_audit = match &part_desc.part { + }, + SnapshotMode::Exclude => vec![], + }; + + let mut listen = match read.listen(as_of.clone()).await { + Ok(handle) => handle, + Err(e) => { + error( + anyhow!( + "{name}: {shard_id} cannot serve requested as_of {as_of:?}: {e:?}" + ), + &msg_tx, + &activator, + ); + return; + } + }; + + let listen_retry = listen_sleep.as_ref().map(|retry| retry()); + + // The head of the stream is enriched with the snapshot parts if they exist + let listen_head = if !snapshot_parts.is_empty() { + let (mut parts, progress) = listen.next(listen_retry).await; + snapshot_parts.append(&mut parts); + futures::stream::iter(Some((snapshot_parts, progress))) + } else { + futures::stream::iter(None) + }; + + // The tail of the stream is all subsequent parts + let listen_tail = futures::stream::unfold(&mut listen, |listen| async move { + Some((listen.next(listen_retry).await, listen)) + }); + + let mut shard_stream = pin!(listen_head.chain(listen_tail)); + + // Ideally, we'd like our audit overhead to be proportional to the actual amount + // of "real" work we're doing in the source. So: start with a small, constant + // budget; add to the budget when we do real work; and skip auditing a part if we + // don't have the budget for it. + let mut audit_budget_bytes = + u64::cast_from(BLOB_TARGET_SIZE.get(&cfg).saturating_mul(2)); + + // All future updates will be timestamped after this frontier. + let mut current_frontier = as_of.clone(); + + // If `until.less_equal(current_frontier)`, it means that all subsequent batches + // will contain only times greater or equal to `until`, which means they can be + // dropped in their entirety. + while !PartialOrder::less_equal(&until, ¤t_frontier) { + let (parts, progress) = shard_stream.next().await.expect("infinite stream"); + + let current_ts = current_frontier + .as_option() + .expect("until should always be <= the empty frontier"); + + let mut out = Vec::with_capacity(parts.len()); + for mut part_desc in parts { + // TODO: Push more of this logic into LeasedBatchPart like we've + // done for project? + if STATS_FILTER_ENABLED.get(&cfg) { + let filter_result = match &part_desc.part { BatchPart::Hollow(x) => { - let mut h = DefaultHasher::new(); - x.key.hash(&mut h); - usize::cast_from(h.finish()) % 100 - < STATS_AUDIT_PERCENT.get(&cfg) + let should_fetch = + x.stats.as_ref().map_or(FilterResult::Keep, |stats| { + filter_fn(&stats.decode(), current_frontier.borrow()) + }); + should_fetch } - BatchPart::Inline { .. } => false, + BatchPart::Inline { .. } => FilterResult::Keep, }; - if should_audit && bytes < audit_budget_bytes { - audit_budget_bytes -= bytes; - metrics.pushdown.parts_audited_count.inc(); - metrics.pushdown.parts_audited_bytes.inc_by(bytes); - part_desc.request_filter_pushdown_audit(); + // Apply the filter: discard or substitute the part if required. + let bytes = u64::cast_from(part_desc.encoded_size_bytes()); + match filter_result { + FilterResult::Keep => { + audit_budget_bytes = audit_budget_bytes.saturating_add(bytes); + } + FilterResult::Discard => { + metrics.pushdown.parts_filtered_count.inc(); + metrics.pushdown.parts_filtered_bytes.inc_by(bytes); + let should_audit = match &part_desc.part { + BatchPart::Hollow(x) => { + let mut h = DefaultHasher::new(); + x.key.hash(&mut h); + usize::cast_from(h.finish()) % 100 + < STATS_AUDIT_PERCENT.get(&cfg) + } + BatchPart::Inline { .. } => false, + }; + if should_audit && bytes < audit_budget_bytes { + audit_budget_bytes -= bytes; + metrics.pushdown.parts_audited_count.inc(); + metrics.pushdown.parts_audited_bytes.inc_by(bytes); + part_desc.request_filter_pushdown_audit(); + } else { + debug!( + "skipping part because of stats filter {:?}", + part_desc.part.stats() + ); + continue; + } + } + FilterResult::ReplaceWith { key, val } => { + part_desc.maybe_optimize(&cfg, key, val); + audit_budget_bytes = audit_budget_bytes.saturating_add(bytes); + } + } + let bytes = u64::cast_from(part_desc.encoded_size_bytes()); + if part_desc.part.is_inline() { + metrics.pushdown.parts_inline_count.inc(); + metrics.pushdown.parts_inline_bytes.inc_by(bytes); } else { - debug!( - "skipping part because of stats filter {:?}", - part_desc.part.stats() - ); - continue; + metrics.pushdown.parts_fetched_count.inc(); + metrics.pushdown.parts_fetched_bytes.inc_by(bytes); } } - FilterResult::ReplaceWith { key, val } => { - part_desc.maybe_optimize(&cfg, key, val); - audit_budget_bytes = audit_budget_bytes.saturating_add(bytes); + + // Give the part to a random worker. This isn't round robin in an attempt + // to avoid skew issues: if your parts alternate size large, small, then + // you'll end up only using half of your workers. + // + // There's certainly some other things we could be doing instead here, but + // this has seemed to work okay so far. Continue to revisit as necessary. + let worker_idx = usize::cast_from(Instant::now().hashed()) % num_workers; + let (part, lease) = part_desc.into_exchangeable_part(); + out.push((worker_idx, part, lease)); + } + + if !out.is_empty() { + let msg = ListenMessage::Parts { + ts: current_ts.clone(), + parts: out, + }; + if msg_tx.send(msg).is_err() { + return; } } - let bytes = u64::cast_from(part_desc.encoded_size_bytes()); - if part_desc.part.is_inline() { - metrics.pushdown.parts_inline_count.inc(); - metrics.pushdown.parts_inline_bytes.inc_by(bytes); - } else { - metrics.pushdown.parts_fetched_count.inc(); - metrics.pushdown.parts_fetched_bytes.inc_by(bytes); + + current_frontier.join_assign(&progress); + if msg_tx.send(ListenMessage::Progress(progress)).is_err() { + return; } + activator.activate(); } - // Give the part to a random worker. This isn't round robin in an attempt to avoid - // skew issues: if your parts alternate size large, small, then you'll end up only - // using half of your workers. - // - // There's certainly some other things we could be doing instead here, but this has - // seemed to work okay so far. Continue to revisit as necessary. - let worker_idx = usize::cast_from(Instant::now().hashed()) % num_workers; - let (part, lease) = part_desc.into_exchangeable_part(); - leases.borrow_mut().push_at(current_ts.clone(), lease); - descs_output.give(&session_cap, (worker_idx, part)); + // Signal completion: all subsequent parts would be filtered by `until`. + let _ = msg_tx.send(ListenMessage::Progress(Antichain::new())); + activator.activate(); + + // Keep the listen handle (and with it the reader's seqno hold, which protects + // the leased parts from GC) alive until the operator signals that all fetches + // have completed; `listen` only drops when this task exits. If the operator is + // dropped instead, this task is aborted and the handles are dropped with it. + let _ = shutdown_rx.await; + } + }) + .abort_on_drop(); + + (msg_rx, Some(shutdown_tx), activation_ack, task) + }); + + let (mut shutdown_handle, shutdown_button) = button(scope, info.address); + + builder.build_reschedule(move |capabilities| { + let [cap]: [_; 1] = capabilities.try_into().expect("one capability per output"); + // Only the chosen worker produces parts; the others hold no + // capabilities. + let mut cap_set = if worker_index == chosen_worker { + CapabilitySet::from_elem(cap) + } else { + trace!( + "We are not the chosen worker ({}), exiting...", + chosen_worker + ); + CapabilitySet::new() + }; + // Leases for parts that have been emitted but whose fetch has not yet + // completed, keyed by the timestamp they were emitted at. Advanced by + // the completed fetches frontier. + let mut leases = LeaseManager::new(); + let mut chosen_state = chosen_state; + // Set once `error_handler` has been notified: the operator stops doing + // work but retains its capabilities so the frontier does not advance. + let mut failed = false; + + move |frontiers| { + // Two-phase shutdown, mirroring `builder_async`: only once *all* + // workers have pressed do we drop capabilities and drain inputs. + // Dropping caps on a local-only press would let the downstream + // frontier advance past times other workers' instances still feed. + if shutdown_handle.local_pressed() { + return if shutdown_handle.all_pressed() { + cap_set = CapabilitySet::new(); + chosen_state = None; + completed_fetches.for_each(|_cap, _data| {}); + false + } else { + // Local press only: wedge. Keep capabilities and leave the + // input undrained so its pending messages hold the frontier. + true + }; + } + + // Drain the completed fetches input. It carries no data + // (`Infallible`); only its frontier matters. + completed_fetches.for_each(|_cap, _data| {}); + + let Some((msg_rx, shutdown_tx, activation_ack, task)) = chosen_state.as_mut() else { + // Non-chosen workers have nothing to do. + return true; + }; + // Keep the listen task alive for as long as the operator runs. + let _ = &task; + activation_ack.ack(); + + // Apply the completed fetches frontier to the leases. + let completed_frontier = frontiers[0].frontier(); + leases.advance_to(completed_frontier); + if completed_frontier.is_empty() { + // All fetches have completed; allow the listen task to drop the + // listen handle and exit. + if let Some(tx) = shutdown_tx.take() { + let _ = tx.send(()); + } + } + + if failed { + // Frozen: retain capabilities so the frontier does not advance. + // Every error path in the listen task sends `Error` as its final + // message and returns, so `msg_rx` is empty forever once we get + // here; no need to keep draining it. + return true; } - current_frontier.join_assign(&progress); - cap_set.downgrade(progress.iter()); + // Drain messages from the listen task. + loop { + match msg_rx.try_recv() { + Ok(ListenMessage::AsOf(as_of)) => { + cap_set.downgrade(as_of.iter()); + } + Ok(ListenMessage::Parts { ts, parts }) => { + let session_cap = cap_set.delayed(&ts); + let mut output = descs_output.activate(); + let mut session = output.session(&session_cap); + for (worker_idx, part, lease) in parts { + leases.push_at(ts.clone(), lease); + session.give((worker_idx, part)); + } + } + Ok(ListenMessage::Progress(progress)) => { + cap_set.downgrade(progress.iter()); + } + Ok(ListenMessage::Error(e)) => { + // Report the error and freeze: capabilities are retained + // so that no spurious progress is observable while a + // restart is pending. + error_handler.report_and_freeze(e); + failed = true; + break; + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => break, + } + } + + // Keep the operator alive; it is torn down via the shutdown button. + true } }); @@ -602,82 +847,205 @@ where D: Monoid + Codec64 + Send + Sync, TInner: Timestamp + Refines, { + let scope = descs.scope(); let mut builder = - AsyncOperatorBuilder::new(format!("shard_source_fetch({})", name), descs.scope()); - let (fetched_output, fetched_stream) = builder.new_output::>(); - let (completed_fetches_output, completed_fetches_stream) = - builder.new_output::>>(); - let mut descs_input = builder.new_input_for_many( + OperatorBuilderRc::new(format!("shard_source_fetch({})", name), scope.clone()); + let info = builder.operator_info(); + // NB: create the outputs before the input, so that the input's default + // connection covers both outputs. + let (fetched_output, fetched_stream) = builder.new_output::>>(); + let mut fetched_output = OutputBuilder::from(fetched_output); + let (_completed_fetches_output, completed_fetches_stream) = + builder.new_output::>(); + let mut descs_input = builder.new_input( descs, Exchange::new(|&(i, _): &(usize, _)| u64::cast_from(i)), - [&fetched_output, &completed_fetches_output], ); let name_owned = name.to_owned(); - let shutdown_button = builder.build(move |_capabilities| async move { - let mut fetcher = mz_ore::task::spawn(|| format!("shard_source_fetch({})", name_owned), { - let diagnostics = Diagnostics { - shard_name: name_owned.clone(), - handle_purpose: format!("shard_source_fetch batch fetcher {}", name_owned), - }; - async move { - client - .await - .create_batch_fetcher::( - shard_id, - key_schema, - val_schema, - is_transient, - diagnostics, - ) + // Channels between the operator and the fetch task: descs flow to the task, + // fetch results flow back. On a missing blob the task attaches the minting + // reader's lease diagnostics so the operator can distinguish a lease expiry + // from a GC bug. The task wakes the operator through the activator after + // each result. + let (desc_tx, mut desc_rx) = tokio::sync::mpsc::unbounded_channel::>(); + let (blob_tx, blob_rx) = tokio::sync::mpsc::unbounded_channel::< + Result, (BlobKey, String)>, + >(); + let (activator, activation_ack) = ArcActivator::new(scope.clone(), &info); + + // The fetch task owns the `BatchFetcher` and performs all async work: + // fetcher creation and the per-part downloads. + let task = mz_ore::task::spawn(|| format!("shard_source_fetch({})", name_owned), { + let diagnostics = Diagnostics { + shard_name: name_owned.clone(), + handle_purpose: format!("shard_source_fetch batch fetcher {}", name_owned), + }; + async move { + let mut fetcher = client + .await + .create_batch_fetcher::( + shard_id, + key_schema, + val_schema, + is_transient, + diagnostics, + ) + .await + .expect("shard codecs should not change"); + while let Some(part) = desc_rx.recv().await { + let reader_id = part.reader_id().clone(); + let fetched = fetcher + .fetch_leased_part(part) .await + .expect("shard_id should match across all workers"); + let fetched = match fetched { + Ok(fetched) => Ok(fetched), + Err(blob_key) => { + // Ideally, readers should never encounter a missing blob. They place a + // seqno hold as they consume their snapshot/listen, preventing any blobs + // they need from being deleted by garbage collection, and all blob + // implementations are linearizable so there should be no possibility of + // stale reads. + // + // However, it is possible for a lease to expire given a sustained period + // of downtime, which could allow parts we expect to exist to be + // deleted... at which point our best option is to request a restart. + // Check the state of the minting reader's lease to tell the two cases + // apart. + let diagnostics = fetcher.missing_blob_diagnostics(&reader_id).await; + Err((blob_key, diagnostics)) + } + }; + if blob_tx.send(fetched).is_err() { + // The operator is gone; stop fetching. + return; + } + activator.activate(); } - }) - .await - .expect("shard codecs should not change"); - - while let Some(event) = descs_input.next().await { - if let Event::Data([fetched_cap, _completed_fetches_cap], data) = event { - // `LeasedBatchPart`es cannot be dropped at this point w/o - // panicking, so swap them to an owned version. - for (_idx, part) in data { - let reader_id = part.reader_id().clone(); - let fetched = fetcher - .fetch_leased_part(part) - .await - .expect("shard_id should match across all workers"); - let fetched = match fetched { - Ok(fetched) => fetched, - Err(blob_key) => { - // Ideally, readers should never encounter a missing blob. They place a seqno - // hold as they consume their snapshot/listen, preventing any blobs they need - // from being deleted by garbage collection, and all blob implementations are - // linearizable so there should be no possibility of stale reads. - // - // However, it is possible for a lease to expire given a sustained period of - // downtime, which could allow parts we expect to exist to be deleted... - // at which point our best option is to request a restart. Check the state - // of the minting reader's lease to tell the two cases apart. - let diagnostics = fetcher.missing_blob_diagnostics(&reader_id).await; - error_handler - .report_and_stop(anyhow!( - "batch fetcher could not fetch batch part {}: {}", - blob_key, - diagnostics - )) - .await + } + }) + .abort_on_drop(); + + let (mut shutdown_handle, shutdown_button) = button(scope, info.address); + + builder.build_reschedule(move |_capabilities| { + // Per-flight capabilities, in the order parts were sent to the fetch + // task (which processes and returns them in the same order). The first + // capability emits the fetched blob; dropping the second advances the + // `completed_fetches` frontier, which releases the part's lease on the + // chosen worker. Holding both until the fetch completes keeps the lease + // alive while the download is in flight. + let mut inflight_caps: VecDeque<(Capability, Capability)> = VecDeque::new(); + // Wrapped in `Option` so we can drop the sender to signal the task that + // no more descs are coming. + let mut desc_tx = Some(desc_tx); + let mut blob_rx = Some(blob_rx); + // Set once `error_handler` has been notified of a missing blob: the + // operator stops doing work but retains its capabilities so the frontier + // does not advance past data we did not emit. + let mut failed = false; + + move |frontiers| { + // Keep the fetch task alive for as long as the operator runs. + let _ = &task; + + // Two-phase shutdown, mirroring `builder_async`: only once *all* + // workers have pressed do we drop capabilities and drain the input. + if shutdown_handle.local_pressed() { + return if shutdown_handle.all_pressed() { + inflight_caps.clear(); + desc_tx = None; + blob_rx = None; + descs_input.for_each(|_cap, _data| {}); + false + } else { + // Local press only: wedge. Keep capabilities and leave the + // input undrained so its pending messages hold the frontier. + true + }; + } + + activation_ack.ack(); + + if failed { + // Frozen: retain every outstanding capability so the frontier + // stays at the missing part and never advances past data we did + // not emit. Crucially we must NOT keep draining `blob_rx`: a + // later, successfully fetched part would otherwise pop the failed + // part's capability off the front of `inflight_caps` (FIFO) and + // advance the frontier past it. Still drain the input to avoid + // stalling the dataflow. + descs_input.for_each(|_cap, _data| {}); + return true; + } + + // Forward incoming descs to the fetch task, retaining a capability + // pair for each. + descs_input.for_each(|cap, data| { + for (_idx, part) in data.drain(..) { + let fetched_cap = cap.delayed(cap.time(), 0); + let completed_cap = cap.delayed(cap.time(), 1); + inflight_caps.push_back((fetched_cap, completed_cap)); + desc_tx + .as_ref() + .expect("desc_tx alive while operator is running") + .send(part) + .expect("fetch task unexpectedly gone"); + } + }); + + // Drain completed fetches, emitting each at its retained capability. + if let Some(rx) = blob_rx.as_mut() { + loop { + match rx.try_recv() { + Ok(Ok(fetched)) => { + let (fetched_cap, _completed_cap) = inflight_caps + .pop_front() + .expect("capability for every in-flight fetch"); + fetched_output + .activate() + .session(&fetched_cap) + .give(fetched); + // `_completed_cap` drops here, advancing the + // `completed_fetches` frontier past this part. + } + Ok(Err((blob_key, diagnostics))) => { + // Report the missing blob and freeze: capabilities are + // retained (including this failed part's) so the + // frontier cannot advance past the missing part. + error_handler.report_and_freeze(anyhow!( + "batch fetcher could not fetch batch part {}: {}", + blob_key, + diagnostics + )); + failed = true; + break; + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + // The task exits only after the desc channel closes + // (which we haven't done) or a panic; with fetches + // outstanding this is unexpected. + assert!( + inflight_caps.is_empty(), + "fetch task unexpectedly gone with {} fetches in flight", + inflight_caps.len() + ); + break; } - }; - { - // Do very fine-grained output activation/session - // creation to ensure that we don't hold activated - // outputs or sessions across await points, which - // would prevent messages from being flushed from - // the shared timely output buffer. - fetched_output.give(&fetched_cap, fetched); } } } + + // Once the input is closed and nothing is in flight, disconnect from + // the task so it can exit. + if frontiers[0].frontier().is_empty() && inflight_caps.is_empty() { + desc_tx = None; + } + + // Keep the operator alive; it is torn down via the shutdown button. + true } }); @@ -693,14 +1061,19 @@ mod tests { use super::*; use std::sync::Arc; - use mz_persist::location::SeqNo; + use mz_persist::location::{Blob, SeqNo}; + use mz_persist_types::codec_impls::StringSchema; use timely::dataflow::operators::Leave; use timely::dataflow::operators::Probe; + use timely::dataflow::operators::capture::{Capture, Event as CaptureEvent}; use timely::dataflow::operators::probe::Handle as ProbeHandle; use timely::progress::Antichain; + use crate::batch::{INLINE_WRITES_SINGLE_MAX_BYTES, INLINE_WRITES_TOTAL_MAX_BYTES}; + use crate::cache::PersistClientCache; + use crate::internal::paths::PartialBlobKey; use crate::operators::shard_source::shard_source; - use crate::{Diagnostics, ShardId}; + use crate::{Diagnostics, PersistLocation, ShardId}; #[mz_ore::test] fn test_lease_manager() { @@ -862,6 +1235,382 @@ mod tests { assert_eq!(res, Antichain::from_elem(expected_frontier)); } + /// Verifies that the source fetches and emits actual data: a batch written + /// before the dataflow starts comes out as at least one `FetchedBlob`, and + /// the output frontier reaches the shard's upper. Exercises the listen task + /// -> descs operator -> fetch task -> fetch operator pipeline end-to-end. + #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[cfg_attr(miri, ignore)] // too slow + async fn test_shard_source_fetches_data() { + let persist_client = PersistClient::new_for_tests().await; + let shard_id = ShardId::new(); + + let mut write = persist_client + .open_writer::( + shard_id, + Arc::new(StringSchema), + Arc::new(StringSchema), + Diagnostics::for_tests(), + ) + .await + .expect("invalid usage"); + let data = [ + (("k1".to_owned(), "v1".to_owned()), 0u64, 1u64), + (("k2".to_owned(), "v2".to_owned()), 1u64, 1u64), + ]; + write.expect_compare_and_append(&data[..], 0, 5).await; + + let expected_frontier = 5; + let (blob_count, frontier) = timely::execute::execute_directly(move |worker| { + let as_of = Antichain::from_elem(0); + let until = Antichain::new(); + + let (capture, probe, token) = worker.dataflow::(|outer| { + let (stream, token) = outer.scoped::("hybrid", |scope| { + let transformer = move |_, descs, _| (descs, vec![]); + let (stream, tokens) = shard_source::( + outer, + scope, + "test_source", + move || std::future::ready(persist_client.clone()), + shard_id, + Some(as_of), + SnapshotMode::Include, + until, + Some(transformer), + Arc::new(StringSchema), + Arc::new(StringSchema), + FilterResult::keep_all, + false.then_some(|| unreachable!()), + async {}, + ErrorHandler::Halt("test"), + ); + (stream.leave(outer), tokens) + }); + + let probe = ProbeHandle::new(); + let stream = stream.probe_with(&probe); + (stream.capture(), probe, token) + }); + + let deadline = Instant::now() + std::time::Duration::from_secs(60); + while probe.less_than(&expected_frontier) { + assert!( + Instant::now() < deadline, + "timed out waiting for output frontier {expected_frontier}" + ); + worker.step(); + } + drop(token); + + let mut blob_count = 0; + while let Ok(event) = capture.try_recv() { + if let CaptureEvent::Messages(_, msgs) = event { + blob_count += msgs.len(); + } + } + let mut frontier = Antichain::new(); + probe.with_frontier(|f| frontier.extend(f.iter().cloned())); + (blob_count, frontier) + }); + + assert!(blob_count >= 1, "expected at least one fetched blob"); + assert_eq!(frontier, Antichain::from_elem(expected_frontier)); + } + + /// Verifies that dropping the source's tokens while it is running does not + /// panic or wedge the worker: capabilities are released so the dataflow can + /// shut down to the empty frontier. + #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[cfg_attr(miri, ignore)] // too slow + async fn test_shard_source_shutdown_mid_stream() { + let persist_client = PersistClient::new_for_tests().await; + let shard_id = ShardId::new(); + + let mut write = persist_client + .open_writer::( + shard_id, + Arc::new(StringSchema), + Arc::new(StringSchema), + Diagnostics::for_tests(), + ) + .await + .expect("invalid usage"); + let data = [(("k1".to_owned(), "v1".to_owned()), 0u64, 1u64)]; + write.expect_compare_and_append(&data[..], 0, 5).await; + + timely::execute::execute_directly(move |worker| { + let as_of = Antichain::from_elem(0); + // An empty `until` means the source would run forever if not shut + // down by dropping its tokens. + let until = Antichain::new(); + + let (probe, token) = worker.dataflow::(|outer| { + let (stream, token) = outer.scoped::("hybrid", |scope| { + let transformer = move |_, descs, _| (descs, vec![]); + let (stream, tokens) = shard_source::( + outer, + scope, + "test_source", + move || std::future::ready(persist_client.clone()), + shard_id, + Some(as_of), + SnapshotMode::Include, + until, + Some(transformer), + Arc::new(StringSchema), + Arc::new(StringSchema), + FilterResult::keep_all, + false.then_some(|| unreachable!()), + async {}, + ErrorHandler::Halt("test"), + ); + (stream.leave(outer), tokens) + }); + + let probe = ProbeHandle::new(); + let _stream = stream.probe_with(&probe); + (probe, token) + }); + + // Step until the source has made progress, so shutdown happens while + // the listen and fetch machinery is live. + let deadline = Instant::now() + std::time::Duration::from_secs(60); + while probe.less_than(&1) { + assert!(Instant::now() < deadline, "timed out waiting for progress"); + worker.step(); + } + + // Shut down and confirm the dataflow drains: with all tokens dropped, + // the operators must release their capabilities and the frontier must + // become empty. + drop(token); + let deadline = Instant::now() + std::time::Duration::from_secs(60); + loop { + assert!(Instant::now() < deadline, "timed out waiting for shutdown"); + worker.step(); + if probe.with_frontier(|f| f.is_empty()) { + break; + } + } + }); + } + + /// Verifies that an unserveable `as_of` (the listing path) reports an error + /// through the `ErrorHandler` and freezes the source: the output frontier + /// stays at the requested `as_of` and the worker does not panic. + #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[cfg_attr(miri, ignore)] // too slow + async fn test_shard_source_error_freeze() { + let persist_client = PersistClient::new_for_tests().await; + let shard_id = ShardId::new(); + + // Write data so the shard's upper is past the as_of (otherwise + // `snapshot` blocks waiting for the upper instead of erroring on the + // since), then advance the since past the as_of we'll request. + let mut write = persist_client + .open_writer::( + shard_id, + Arc::new(StringSchema), + Arc::new(StringSchema), + Diagnostics::for_tests(), + ) + .await + .expect("invalid usage"); + let data = [(("k1".to_owned(), "v1".to_owned()), 0u64, 1u64)]; + write.expect_compare_and_append(&data[..], 0, 5).await; + initialize_shard(&persist_client, shard_id, Antichain::from_elem(3)).await; + + let (errored, frontier) = timely::execute::execute_directly(move |worker| { + let as_of = Antichain::from_elem(1); + let until = Antichain::new(); + + let errored = Rc::new(std::cell::Cell::new(false)); + let error_handler = ErrorHandler::signal({ + let errored = Rc::clone(&errored); + move |_err| errored.set(true) + }); + + let (probe, _token) = worker.dataflow::(|outer| { + let (stream, token) = outer.scoped::("hybrid", |scope| { + let transformer = move |_, descs, _| (descs, vec![]); + let (stream, tokens) = shard_source::( + outer, + scope, + "test_source", + move || std::future::ready(persist_client.clone()), + shard_id, + Some(as_of), + SnapshotMode::Include, + until, + Some(transformer), + Arc::new(StringSchema), + Arc::new(StringSchema), + FilterResult::keep_all, + false.then_some(|| unreachable!()), + async {}, + error_handler, + ); + (stream.leave(outer), tokens) + }); + + let probe = ProbeHandle::new(); + let _stream = stream.probe_with(&probe); + (probe, token) + }); + + let deadline = Instant::now() + std::time::Duration::from_secs(60); + while !errored.get() { + assert!(Instant::now() < deadline, "timed out waiting for error"); + worker.step(); + } + // Keep stepping; the source must stay frozen at the as_of. + for _ in 0..100 { + worker.step(); + } + + let mut frontier = Antichain::new(); + probe.with_frontier(|f| frontier.extend(f.iter().cloned())); + (errored.get(), frontier) + }); + + assert!(errored); + assert_eq!(frontier, Antichain::from_elem(1)); + } + + /// Regression test for the `shard_source_fetch` freeze path: a blob that + /// goes missing while *fetching* (the listing path is covered by + /// `test_shard_source_error_freeze`) must freeze the output frontier at the + /// missing part, not keep draining later results — which would match them + /// to earlier capabilities and advance the frontier past data never emitted. + /// + /// We delete the first batch's blob, which is read by the snapshot at + /// `as_of = 0`. Its fetch fails; the later batches (t=1, t=2) fetch fine. A + /// buggy source that keeps draining after freezing pops the failed part's + /// capability off the front of `inflight_caps` and advances the frontier + /// onto the later parts. We step until the dataflow quiesces (with brief + /// parks so the tokio fetch task finishes), guaranteeing the later results + /// are produced and would be drained under the bug; a fixed-iteration wait + /// would race the task and mask it. + #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[cfg_attr(miri, ignore)] // too slow + async fn test_shard_source_fetch_error_freeze() { + // Force writes to real blobs (inline parts have no blob to delete) and + // disable compaction so the three batches stay distinct and deletable. + let mut cache = PersistClientCache::new_no_metrics(); + cache.cfg.compaction_enabled = false; + cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0); + cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0); + let persist_client = cache + .open(PersistLocation::new_in_mem()) + .await + .expect("in-mem location is valid"); + let shard_id = ShardId::new(); + // Clones of a `PersistClient` share the blob `Arc`, so deleting via this + // handle is visible to the reader the source opens. + let blob = Arc::clone(&persist_client.blob); + + let mut write = persist_client + .open_writer::( + shard_id, + Arc::new(StringSchema), + Arc::new(StringSchema), + Diagnostics::for_tests(), + ) + .await + .expect("invalid usage"); + + // The data-part (non-rollup) blob keys currently present. + async fn batch_keys(blob: &dyn Blob) -> std::collections::BTreeSet { + let mut keys = std::collections::BTreeSet::new(); + blob.list_keys_and_metadata("", &mut |meta| { + if let Ok((_, PartialBlobKey::Batch(..))) = BlobKey::parse_ids(meta.key) { + keys.insert(meta.key.to_owned()); + } + }) + .await + .expect("list keys"); + keys + } + + let row = |t: u64| ((format!("k{t}"), format!("v{t}")), t, 1u64); + let before = batch_keys(blob.as_ref()).await; + write.expect_compare_and_append(&[row(0)], 0, 1).await; + let after = batch_keys(blob.as_ref()).await; + write.expect_compare_and_append(&[row(1)], 1, 2).await; + write.expect_compare_and_append(&[row(2)], 2, 3).await; + + // Delete exactly the first (t=0) batch's data part(s); the snapshot at + // as_of=0 reads it. + let missing: Vec<_> = after.difference(&before).cloned().collect(); + assert!(!missing.is_empty(), "first batch wrote no blob part"); + for key in &missing { + blob.delete(key).await.expect("delete"); + } + + let frontier = timely::execute::execute_directly(move |worker| { + let errored = Rc::new(std::cell::Cell::new(false)); + let error_handler = ErrorHandler::signal({ + let errored = Rc::clone(&errored); + move |_err| errored.set(true) + }); + + let (probe, _token) = worker.dataflow::(|outer| { + let (stream, token) = outer.scoped::("hybrid", |scope| { + let (stream, tokens) = shard_source::( + outer, + scope, + "test_source", + move || std::future::ready(persist_client.clone()), + shard_id, + Some(Antichain::from_elem(0)), + SnapshotMode::Include, + Antichain::new(), + Some(move |_, descs, _| (descs, vec![])), + Arc::new(StringSchema), + Arc::new(StringSchema), + FilterResult::keep_all, + false.then_some(|| unreachable!()), + async {}, + error_handler, + ); + (stream.leave(outer), tokens) + }); + let probe = ProbeHandle::new(); + stream.probe_with(&probe); + (probe, token) + }); + + // Step until the fetch error fires, then step until the dataflow + // quiesces, with brief parks so the tokio fetch task can finish. + let deadline = Instant::now() + std::time::Duration::from_secs(60); + while !errored.get() { + assert!( + Instant::now() < deadline, + "timed out waiting for fetch error" + ); + worker.step_or_park(Some(std::time::Duration::from_millis(1))); + } + let mut last = probe.with_frontier(|f| f.to_owned()); + let mut stable = 0; + while stable < 100 { + assert!(Instant::now() < deadline, "timed out waiting for quiesce"); + worker.step_or_park(Some(std::time::Duration::from_millis(1))); + let now = probe.with_frontier(|f| f.to_owned()); + if now == last { + stable += 1; + } else { + stable = 0; + last = now; + } + } + last + }); + + // Frozen at the missing part (t=0); the bug advanced this past it. + assert_eq!(frontier, Antichain::from_elem(0)); + } + async fn initialize_shard( persist_client: &PersistClient, shard_id: ShardId, From fd3a6687db2f2d0e7a7a258df33ef320cedb0c47 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 16 Jun 2026 10:54:52 +0200 Subject: [PATCH 2/4] txn-wal: bound read-worker park in stress test With the persist source's listen polling moved into a tokio task, a fully caught-up DataSubscribe with no listen retry timer produces no worker activations. The stress test's read workers parked indefinitely on step_or_park(None) while polling a oneshot, so they never observed the shutdown signal. Bound the parks so they re-check between steps. Co-Authored-By: Claude Opus 4.8 --- src/txn-wal/src/txns.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/txn-wal/src/txns.rs b/src/txn-wal/src/txns.rs index 59e8c68414494..ef60c0544dd58 100644 --- a/src/txn-wal/src/txns.rs +++ b/src/txn-wal/src/txns.rs @@ -1525,7 +1525,15 @@ mod tests { let data_id = format!("{:.9}", data_id.to_string()); let _guard = info_span!("read_worker", %data_id, as_of).entered(); loop { - subscribe.worker.step_or_park(None); + // NB: Bound the park. The `rx` oneshot and the progress + // check below are polled between steps but do not + // activate the worker, so an indefinite park can miss + // them. (The persist source does its listen polling in a + // tokio task; a fully caught-up source with no listen + // retry timer produces no further activations.) + subscribe + .worker + .step_or_park(Some(Duration::from_millis(1))); subscribe.capture_output(); let until = match rx.try_recv() { Ok(ts) => ts, @@ -1535,7 +1543,9 @@ mod tests { Err(oneshot::error::TryRecvError::Closed) => 0, }; while subscribe.progress() < until { - subscribe.worker.step_or_park(None); + subscribe + .worker + .step_or_park(Some(Duration::from_millis(1))); subscribe.capture_output(); } return subscribe.output().clone(); From f922e92b5fc27b075295d9f612d82fef50c02e49 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 16 Jun 2026 10:54:53 +0200 Subject: [PATCH 3/4] sqllogictest: shard_source_descs_return merged into descs The lease-return input now lives on the shard_source_descs operator, so introspection names that operator instead. Co-Authored-By: Claude Opus 4.8 --- test/sqllogictest/introspection/relations.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/sqllogictest/introspection/relations.slt b/test/sqllogictest/introspection/relations.slt index a1f31e8090c53..30951350119f9 100644 --- a/test/sqllogictest/introspection/relations.slt +++ b/test/sqllogictest/introspection/relations.slt @@ -71,7 +71,7 @@ SuppressEarlyProgress LimitProgress(Dataflow:␠materialize.public.test_primary decode_backpressure_probe(u1) Feedback alloc::vec::Vec expire_stream_at(materialize.public.test_primary_idx_export_index_errs) LogDataflowErrorsStream alloc::vec::Vec)>>>> expire_stream_at(materialize.public.test_primary_idx_export_index_oks) InspectBatch alloc::vec::Vec)>>>> -granular_backpressure(u1) shard_source_descs_return(u1) alloc::vec::Vec +granular_backpressure(u1) shard_source_descs(u1) alloc::vec::Vec granular_backpressure(u1) txns_progress_frontiers(u1) alloc::vec::Vec<(core::result::Result,␠(mz_repr::timestamp::Timestamp,␠mz_storage_operators::persist_source::Subtime),␠mz_ore::overflowing::Overflowing)> persist_source::decode_and_mfp(u1) InspectBatch alloc::vec::Vec<(core::result::Result,␠(mz_repr::timestamp::Timestamp,␠mz_storage_operators::persist_source::Subtime),␠mz_ore::overflowing::Overflowing)> persist_source_backpressure(backpressure(u1)) shard_source_fetch(u1) alloc::vec::Vec<(usize,␠mz_persist_client::fetch::ExchangeableBatchPart)> From ffa9df5f76966453ce35030cba7048788522c672 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 16 Jun 2026 13:27:28 +0200 Subject: [PATCH 4/4] persist: key shard_source_fetch bookkeeping by time, not FIFO MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The fetch operator matched results to capabilities by arrival order: it pushed a capability pair per desc into a VecDeque and popped the front for each result. That was sound only because the fetch task is a strictly sequential loop, and it would silently misassign capabilities the moment the task fetched concurrently — releasing a part's completed-fetches capability (and its lease) for a part not actually fetched. Tag each desc with the time it was minted at, echo that time back with the result, and track outstanding fetches in a per-time BTreeMap of (data cap, completed cap, count). Emit each blob at its time's capability and drop both capabilities when a time's count reaches zero. This is independent of the order results return in, and it aligns with the descs-side LeaseManager, which is also keyed by time. As a bonus the freeze-on-error is now structural: the failed time's capability is never decremented, so the frontier holds there regardless of what else drains. Per review feedback on #36910. Co-Authored-By: Claude Opus 4.8 --- .../src/operators/shard_source.rs | 127 ++++++++++-------- 1 file changed, 70 insertions(+), 57 deletions(-) diff --git a/src/persist-client/src/operators/shard_source.rs b/src/persist-client/src/operators/shard_source.rs index 5efa2680f0ae6..1bfbef6dddbcf 100644 --- a/src/persist-client/src/operators/shard_source.rs +++ b/src/persist-client/src/operators/shard_source.rs @@ -72,11 +72,15 @@ //! the separate `shard_source_descs_return` operator) is merged in as a //! disconnected `completed_fetches` input. //! -//! **`shard_source_fetch`** forwards each incoming desc to its fetch task and -//! retains a *pair* of capabilities for it: one for the data output, one for -//! the `completed_fetches` output. The task downloads parts in order; the -//! operator matches results to capabilities FIFO, emits the [FetchedBlob] at -//! the first capability, and drops the second. +//! **`shard_source_fetch`** forwards each incoming desc to its fetch task, +//! tagged with the time it was minted at, and retains a capability pair (data +//! output + `completed_fetches` output) per time, counting how many fetches at +//! that time are outstanding. The task echoes the time back with each result; +//! the operator emits the [FetchedBlob] at that time's data capability and, +//! when a time's outstanding count reaches zero, drops both of its capabilities +//! — advancing the data frontier and releasing that time's leases. Keying by +//! time rather than arrival order makes the operator independent of the order +//! results come back in. //! //! **Lease lifecycle**: dropping the completed-fetches capability advances a //! feedback loop back into `shard_source_descs`, whose frontier advances the @@ -99,10 +103,10 @@ //! * [LeasedBatchPart]s panic on drop while leased; only the lease-split //! [ExchangeableBatchPart] representation may cross channels, where it can be //! dropped harmlessly on shutdown. -//! * The fetch task processes descs strictly in order; the FIFO capability -//! matching in the fetch operator is only sound because of this. Once a fetch -//! fails the operator freezes and must STOP draining results — a later, good -//! result would otherwise pop the failed part's capability and advance the +//! * Each result carries its own time, so the operator does not depend on the +//! order the task returns them. Once a fetch fails the operator freezes and +//! must STOP draining results — a later, good result would otherwise release +//! a capability and advance the //! frontier past data never emitted. //! * The `completed_fetches` feedback edge carries no data (`Infallible`); it //! exists for its frontier, which signals cross-process fetch completion, @@ -117,8 +121,8 @@ //! [LeasedBatchPart]: crate::fetch::LeasedBatchPart //! [AbortOnDropHandle]: mz_ore::task::AbortOnDropHandle +use std::collections::BTreeMap; use std::collections::hash_map::DefaultHasher; -use std::collections::{BTreeMap, VecDeque}; use std::convert::Infallible; use std::fmt::{Debug, Formatter}; use std::future::Future; @@ -868,10 +872,15 @@ where // reader's lease diagnostics so the operator can distinguish a lease expiry // from a GC bug. The task wakes the operator through the activator after // each result. - let (desc_tx, mut desc_rx) = tokio::sync::mpsc::unbounded_channel::>(); - let (blob_tx, blob_rx) = tokio::sync::mpsc::unbounded_channel::< + // Each desc is tagged with the time it was minted at; the task echoes that + // time back with the result so the operator can emit at the right + // capability without relying on the order results come back in. + let (desc_tx, mut desc_rx) = + tokio::sync::mpsc::unbounded_channel::<(TInner, ExchangeableBatchPart)>(); + let (blob_tx, blob_rx) = tokio::sync::mpsc::unbounded_channel::<( + TInner, Result, (BlobKey, String)>, - >(); + )>(); let (activator, activation_ack) = ArcActivator::new(scope.clone(), &info); // The fetch task owns the `BatchFetcher` and performs all async work: @@ -893,7 +902,7 @@ where ) .await .expect("shard codecs should not change"); - while let Some(part) = desc_rx.recv().await { + while let Some((time, part)) = desc_rx.recv().await { let reader_id = part.reader_id().clone(); let fetched = fetcher .fetch_leased_part(part) @@ -917,7 +926,7 @@ where Err((blob_key, diagnostics)) } }; - if blob_tx.send(fetched).is_err() { + if blob_tx.send((time, fetched)).is_err() { // The operator is gone; stop fetching. return; } @@ -930,13 +939,17 @@ where let (mut shutdown_handle, shutdown_button) = button(scope, info.address); builder.build_reschedule(move |_capabilities| { - // Per-flight capabilities, in the order parts were sent to the fetch - // task (which processes and returns them in the same order). The first - // capability emits the fetched blob; dropping the second advances the - // `completed_fetches` frontier, which releases the part's lease on the - // chosen worker. Holding both until the fetch completes keeps the lease - // alive while the download is in flight. - let mut inflight_caps: VecDeque<(Capability, Capability)> = VecDeque::new(); + // Outstanding fetches, keyed by the time the part was minted at. For + // each time we retain a capability on each output (data and + // completed-fetches) and count how many fetches at that time are in + // flight. When the count reaches zero we drop both capabilities, + // advancing the data and completed-fetches frontiers past that time; + // the latter releases the parts' leases on the chosen worker (whose + // `LeaseManager` is likewise keyed by time). Keying by time rather than + // by arrival order makes the operator robust to the task returning + // results in any order — e.g. if it ever fetches concurrently. + let mut outstanding: BTreeMap, Capability, usize)> = + BTreeMap::new(); // Wrapped in `Option` so we can drop the sender to signal the task that // no more descs are coming. let mut desc_tx = Some(desc_tx); @@ -954,7 +967,7 @@ where // workers have pressed do we drop capabilities and drain the input. if shutdown_handle.local_pressed() { return if shutdown_handle.all_pressed() { - inflight_caps.clear(); + outstanding.clear(); desc_tx = None; blob_rx = None; descs_input.for_each(|_cap, _data| {}); @@ -972,45 +985,47 @@ where // Frozen: retain every outstanding capability so the frontier // stays at the missing part and never advances past data we did // not emit. Crucially we must NOT keep draining `blob_rx`: a - // later, successfully fetched part would otherwise pop the failed - // part's capability off the front of `inflight_caps` (FIFO) and - // advance the frontier past it. Still drain the input to avoid - // stalling the dataflow. + // later, successfully fetched part would otherwise release a + // capability and let the frontier advance past the missing part. + // Still drain the input to avoid stalling the dataflow. descs_input.for_each(|_cap, _data| {}); return true; } // Forward incoming descs to the fetch task, retaining a capability - // pair for each. + // pair per time and counting the fetch as outstanding. descs_input.for_each(|cap, data| { for (_idx, part) in data.drain(..) { - let fetched_cap = cap.delayed(cap.time(), 0); - let completed_cap = cap.delayed(cap.time(), 1); - inflight_caps.push_back((fetched_cap, completed_cap)); + let entry = outstanding.entry(cap.time().clone()).or_insert_with(|| { + (cap.delayed(cap.time(), 0), cap.delayed(cap.time(), 1), 0) + }); + entry.2 += 1; desc_tx .as_ref() .expect("desc_tx alive while operator is running") - .send(part) + .send((cap.time().clone(), part)) .expect("fetch task unexpectedly gone"); } }); - // Drain completed fetches, emitting each at its retained capability. + // Drain completed fetches, emitting each at the capability for its + // time. if let Some(rx) = blob_rx.as_mut() { loop { match rx.try_recv() { - Ok(Ok(fetched)) => { - let (fetched_cap, _completed_cap) = inflight_caps - .pop_front() - .expect("capability for every in-flight fetch"); - fetched_output - .activate() - .session(&fetched_cap) - .give(fetched); - // `_completed_cap` drops here, advancing the - // `completed_fetches` frontier past this part. + Ok((time, Ok(fetched))) => { + let entry = outstanding + .get_mut(&time) + .expect("capability for every in-flight fetch time"); + fetched_output.activate().session(&entry.0).give(fetched); + entry.2 -= 1; + if entry.2 == 0 { + // Drops both capabilities, advancing the data and + // completed-fetches frontiers past `time`. + outstanding.remove(&time); + } } - Ok(Err((blob_key, diagnostics))) => { + Ok((_time, Err((blob_key, diagnostics)))) => { // Report the missing blob and freeze: capabilities are // retained (including this failed part's) so the // frontier cannot advance past the missing part. @@ -1028,9 +1043,9 @@ where // (which we haven't done) or a panic; with fetches // outstanding this is unexpected. assert!( - inflight_caps.is_empty(), - "fetch task unexpectedly gone with {} fetches in flight", - inflight_caps.len() + outstanding.is_empty(), + "fetch task unexpectedly gone with {} outstanding fetch times", + outstanding.len() ); break; } @@ -1040,7 +1055,7 @@ where // Once the input is closed and nothing is in flight, disconnect from // the task so it can exit. - if frontiers[0].frontier().is_empty() && inflight_caps.is_empty() { + if frontiers[0].frontier().is_empty() && outstanding.is_empty() { desc_tx = None; } @@ -1481,17 +1496,15 @@ mod tests { /// Regression test for the `shard_source_fetch` freeze path: a blob that /// goes missing while *fetching* (the listing path is covered by /// `test_shard_source_error_freeze`) must freeze the output frontier at the - /// missing part, not keep draining later results — which would match them - /// to earlier capabilities and advance the frontier past data never emitted. + /// missing part and report the error, never advancing past data never + /// emitted. /// /// We delete the first batch's blob, which is read by the snapshot at - /// `as_of = 0`. Its fetch fails; the later batches (t=1, t=2) fetch fine. A - /// buggy source that keeps draining after freezing pops the failed part's - /// capability off the front of `inflight_caps` and advances the frontier - /// onto the later parts. We step until the dataflow quiesces (with brief - /// parks so the tokio fetch task finishes), guaranteeing the later results - /// are produced and would be drained under the bug; a fixed-iteration wait - /// would race the task and mask it. + /// `as_of = 0`. Its fetch fails; the later batches (t=1, t=2) fetch fine. We + /// step until the dataflow quiesces (with brief parks so the tokio fetch + /// task finishes), so the later results are produced, then assert the error + /// fired and the frontier stayed at the missing part — the failed time's + /// capability is held regardless of what other times complete. #[mz_ore::test(tokio::test(flavor = "multi_thread"))] #[cfg_attr(miri, ignore)] // too slow async fn test_shard_source_fetch_error_freeze() {