diff --git a/src/persist-client/src/operators/shard_source.rs b/src/persist-client/src/operators/shard_source.rs
index ac4cbfa165be4..1bfbef6dddbcf 100644
--- a/src/persist-client/src/operators/shard_source.rs
+++ b/src/persist-client/src/operators/shard_source.rs
@@ -8,8 +8,119 @@
// by the Apache License, Version 2.0.
//! A source that reads from a persist shard.
+//!
+//! # For consumers
+//!
+//! [shard_source] renders a set of timely operators that continuously read a
+//! persist shard and emit its contents as a stream of [FetchedBlob]s: parts
+//! that have been downloaded from blob storage but not yet decoded. Callers
+//! decode them (e.g. via [FetchedBlob::parse]) downstream, typically on the
+//! same worker, so that decode cost scales with the number of workers.
+//!
+//! The source observes the following contract:
+//!
+//! * **Times**: all emitted times are advanced by the given `as_of`. With
+//! [SnapshotMode::Include] the shard contents as of `as_of` are emitted at
+//! the `as_of`; with [SnapshotMode::Exclude] only subsequent updates are.
+//! * **Frontier**: the output frontier tracks the shard's `upper`, eagerly
+//! downgraded to the `as_of` before the snapshot is available so downstream
+//! consumers (e.g. `persist_sink`) can rely on close frontier tracking. When
+//! `until` is non-empty, parts lying entirely at or beyond `until` are
+//! dropped and the source eventually completes; fine-grained filtering of
+//! individual updates against `until` is the caller's responsibility.
+//! * **Distribution**: parts are distributed across all workers, regardless of
+//! which worker coordinates the read.
+//! * **Filter pushdown**: `filter_fn` is consulted with each part's stats and
+//! may keep the part, discard it without fetching, or replace its contents
+//! with a single-row constant ([FilterResult]). A random sample of discarded
+//! parts is fetched anyway to audit the decision.
+//! * **Errors**: conditions that are neither data-plane errors nor bugs (an
+//! unserveable `as_of`, a missing blob after a lease timeout) are reported to
+//! the given [ErrorHandler]. The source then freezes: it stops doing work but
+//! retains its capabilities, so the frontier never advances past unproduced
+//! data while a halt or dataflow restart is pending.
+//! * **Shutdown**: dropping the returned [PressOnDropButton] tokens shuts the
+//! source down, dropping all held capabilities and abandoning in-flight work.
+//!
+//! # For implementors
+//!
+//! The source is split into two synchronous timely operators, each paired with
+//! a tokio task that owns all async persist work. Operators and tasks
+//! communicate over channels; tasks wake their operator through a
+//! [SyncActivator]-backed [ArcActivator], which also unparks a parked worker.
+//!
+//! ```text
+//! tokio: listen task ──(parts+leases, progress, mpsc)──> [shard_source_descs]
+//! (chosen worker only) │ exchange by assigned worker
+//! ▲ oneshot: drop listen ▼
+//! └──────────────────────────────────────────── [shard_source_fetch] (per worker)
+//! completed_fetches feedback │ ▲
+//! desc │ │ FetchedBlob
+//! (mpsc) ▼ │ (mpsc)
+//! tokio: fetch task (per worker)
+//! ```
+//!
+//! **`shard_source_descs`** runs on all workers, but only the chosen worker
+//! (hash of the name) spawns a listen task and holds capabilities. The task
+//! opens a leased reader, waits for the `start_signal`, resolves the `as_of`,
+//! and walks snapshot + listen, applying `filter_fn` and the audit budget. It
+//! splits each [LeasedBatchPart] into an [ExchangeableBatchPart] plus its
+//! [Lease] and sends both to the operator, along with progress updates that
+//! drive the operator's capability downgrades. The operator emits
+//! `(worker_idx, part)` pairs — exchanged by index — and parks each lease in a
+//! `LeaseManager` keyed by the part's time. The lease-return input (formerly
+//! the separate `shard_source_descs_return` operator) is merged in as a
+//! disconnected `completed_fetches` input.
+//!
+//! **`shard_source_fetch`** forwards each incoming desc to its fetch task,
+//! tagged with the time it was minted at, and retains a capability pair (data
+//! output + `completed_fetches` output) per time, counting how many fetches at
+//! that time are outstanding. The task echoes the time back with each result;
+//! the operator emits the [FetchedBlob] at that time's data capability and,
+//! when a time's outstanding count reaches zero, drops both of its capabilities
+//! — advancing the data frontier and releasing that time's leases. Keying by
+//! time rather than arrival order makes the operator independent of the order
+//! results come back in.
+//!
+//! **Lease lifecycle**: dropping the completed-fetches capability advances a
+//! feedback loop back into `shard_source_descs`, whose frontier advances the
+//! `LeaseManager` and drops the leases for fetched parts. The listen task — and
+//! with it the reader and its SeqNo hold, which is what actually protects
+//! unfetched parts' blobs from GC — must outlive all in-flight fetches: the
+//! operator releases it through a oneshot only once the completed-fetches
+//! frontier is empty. If the dataflow is dropped instead, the tasks are aborted
+//! via their [AbortOnDropHandle]s.
+//!
+//! **Two-phase shutdown**: both operators return a [PressOnDropButton] and so
+//! participate in `builder_async`'s coordinated shutdown. Their schedule
+//! closures use `build_reschedule` and only drop capabilities / drain inputs
+//! once *all* workers have pressed (`all_pressed`). Dropping capabilities on a
+//! local-only press would let the downstream frontier advance past times that
+//! other workers' instances still feed (cross-worker teardown skew).
+//!
+//! Subtleties worth knowing before changing this code:
+//!
+//! * [LeasedBatchPart]s panic on drop while leased; only the lease-split
+//! [ExchangeableBatchPart] representation may cross channels, where it can be
+//! dropped harmlessly on shutdown.
+//! * Each result carries its own time, so the operator does not depend on the
+//! order the task returns them. Once a fetch fails the operator freezes and
+//! must STOP draining results — a later, good result would otherwise release
+//! a capability and advance the
+//! frontier past data never emitted.
+//! * The `completed_fetches` feedback edge carries no data (`Infallible`); it
+//! exists for its frontier, which signals cross-process fetch completion,
+//! wakes the descs operator, and keeps that operator alive (and thus the
+//! listen task's SeqNo hold) until all fetches finish.
+//! * Tests that step workers manually must not park indefinitely while polling
+//! state that does not activate the worker: with the listen in a task, a
+//! caught-up source with no listen retry timer produces no activations.
+//!
+//! [SyncActivator]: timely::scheduling::SyncActivator
+//! [ArcActivator]: mz_timely_util::activator::ArcActivator
+//! [LeasedBatchPart]: crate::fetch::LeasedBatchPart
+//! [AbortOnDropHandle]: mz_ore::task::AbortOnDropHandle
-use std::cell::RefCell;
use std::collections::BTreeMap;
use std::collections::hash_map::DefaultHasher;
use std::convert::Infallible;
@@ -28,25 +139,26 @@ use differential_dataflow::difference::Monoid;
use differential_dataflow::lattice::Lattice;
use futures_util::StreamExt;
use mz_ore::cast::CastFrom;
-use mz_ore::collections::CollectionExt;
use mz_persist_types::stats::PartStats;
use mz_persist_types::{Codec, Codec64};
-use mz_timely_util::builder_async::{
- Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
-};
+use mz_timely_util::activator::ArcActivator;
+use mz_timely_util::builder_async::{PressOnDropButton, button};
use timely::PartialOrder;
-use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::{Exchange, Pipeline};
-use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Enter, Feedback, Leave};
+use timely::dataflow::operators::generic::OutputBuilder;
+use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
+use timely::dataflow::operators::{Capability, CapabilitySet, ConnectLoop, Enter, Feedback, Leave};
use timely::dataflow::{Scope, StreamVec};
use timely::order::TotalOrder;
use timely::progress::frontier::AntichainRef;
use timely::progress::{Antichain, Timestamp, timestamp::Refines};
+use tokio::sync::mpsc::error::TryRecvError;
use tracing::{debug, trace};
use crate::batch::BLOB_TARGET_SIZE;
use crate::cfg::{RetryParameters, USE_CRITICAL_SINCE_SOURCE};
use crate::fetch::{ExchangeableBatchPart, FetchedBlob, Lease};
+use crate::internal::paths::BlobKey;
use crate::internal::state::BatchPart;
use crate::stats::{STATS_AUDIT_PERCENT, STATS_FILTER_ENABLED};
use crate::{Diagnostics, PersistClient, ShardId};
@@ -109,6 +221,19 @@ impl ErrorHandler {
Self::Signal(Rc::new(signal_fn))
}
+ /// Signal an error to an error handler from a synchronous operator. For [ErrorHandler::Halt]
+ /// this never returns; for [ErrorHandler::Signal] it returns after invoking the callback, and
+ /// the caller is responsible for "freezing": retaining its capabilities and doing no further
+ /// work, so that no spurious progress is observable while a restart is pending.
+ pub fn report_and_freeze(&self, error: anyhow::Error) {
+ match self {
+ ErrorHandler::Halt(name) => {
+ mz_ore::halt!("unhandled error in {name}: {error:#}")
+ }
+ ErrorHandler::Signal(callback) => callback(error),
+ }
+ }
+
/// Signal an error to an error handler. This function never returns: logically it blocks until
/// restart, though that restart might be sooner (if halting) or later (if triggering a dataflow
/// restart, for example).
@@ -150,10 +275,10 @@ pub fn shard_source<'inner, 'outer, K, V, T, D, DT, TOuter, C>(
desc_transformer: Option
,
key_schema: Arc,
val_schema: Arc,
- filter_fn: impl FnMut(&PartStats, AntichainRef) -> FilterResult + 'static,
+ filter_fn: impl FnMut(&PartStats, AntichainRef) -> FilterResult + Send + 'static,
// If Some, an override for the default listen sleep retry parameters.
- listen_sleep: Option RetryParameters + 'static>,
- start_signal: impl Future