Skip to content

persist: deasync shard_source operators#36910

Open
antiguru wants to merge 4 commits into
MaterializeInc:mainfrom
antiguru:persist-source-deasync-design
Open

persist: deasync shard_source operators#36910
antiguru wants to merge 4 commits into
MaterializeInc:mainfrom
antiguru:persist-source-deasync-design

Conversation

@antiguru

@antiguru antiguru commented Jun 4, 2026

Copy link
Copy Markdown
Member

Motivation

The persist source operators are built on the timely async bridge, which we are migrating away from (cf. #36810). This converts them to synchronous operators with no behavior change.

(This PR previously also changed the fetch-semaphore gating; that has been dropped to keep it a single-purpose deasync. The memory/backpressure work is tracked separately.)

Description

Convert shard_source_descs and shard_source_fetch from AsyncOperatorBuilder to synchronous OperatorBuilderRc operators, each paired with a tokio task that owns the persist I/O.

  • shard_source_descs runs a listen task on the chosen worker that owns the reader, snapshot, listen loop, and stats filtering, and sends parts (split into ExchangeableBatchPart + Lease) plus progress over a channel. The former shard_source_descs_return operator is merged in as a disconnected completed_fetches input; the listen handle (the reader's SeqNo hold) is released via a oneshot once that frontier empties, preserving the old lease lifetime.
  • shard_source_fetch forwards descs to a fetch task and retains a per-flight capability pair; results are matched FIFO, emitted at the data capability, with the completed-fetches capability dropped to release the lease only after the download completes. On a missing blob it reports through the ErrorHandler and freezes — retaining capabilities and ceasing to drain results, so a later good result cannot advance the frontier past the missing part.
  • Both operators reproduce builder_async's two-phase shutdown (build_reschedule + the coordinated button), so a local-only press cannot advance the downstream frontier past times other workers' instances still feed.
  • filter_fn, listen_sleep, and start_signal gain Send bounds; all callers already satisfy them.
  • The txn-wal stress test parked its read workers indefinitely between steps, relying on the async operators' listen retries as accidental wakeups; with the listen in a task a caught-up source produces none, so the parks are now bounded.

Verification

New shard_source unit tests: end-to-end fetch, mid-stream shutdown, listing-path error freeze, and fetch-path error freeze (deterministic via quiesce; confirmed to fail without the freeze fix). Existing persist-client, txn-wal (incl. stress_correctness), and the relations.slt introspection test pass locally.

🤖 Generated with Claude Code

}

if failed {
return;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to drain the msg_rx?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No: every error path in the listen task sends Error as its final message and returns, so once the operator has observed an Error the channel is provably empty forever. Added a comment documenting the invariant (8f38bab).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — this was a real bug, fixed in 94467ce (rebased onto current main).

On a fetch error the operator set failed and breaked out of the drain loop, but kept re-entering it on subsequent activations. A later, successfully-fetched part's Ok result would then pop the failed part's capability off the front of inflight_caps (FIFO) and emit at it, advancing the output frontier past the missing part — defeating the freeze. The fix returns early once frozen, retaining every outstanding capability.

I adapted your reproduction: it deletes the first batch's blob and steps until the dataflow quiesces (with brief parks so the tokio fetch task finishes), rather than a fixed iteration count — otherwise it races the task and only catches the bug ~13% of the time. It now fails deterministically without the fix and passes with it.

@antiguru antiguru force-pushed the persist-source-deasync-design branch from aec4880 to 8f38bab Compare June 5, 2026 10:02
@antiguru antiguru marked this pull request as ready for review June 5, 2026 11:02
@antiguru antiguru requested review from a team and aljoscha as code owners June 5, 2026 11:02
@antiguru antiguru requested a review from petrosagg June 5, 2026 11:03

@def- def- left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude found this issue:

diff --git a/src/persist-client/src/operators/shard_source.rs b/src/persist-client/src/operators/shard_source.rs
index bb4da7fae9..c861e66d8f 100644
--- a/src/persist-client/src/operators/shard_source.rs
+++ b/src/persist-client/src/operators/shard_source.rs
@@ -1055,15 +1055,19 @@ mod tests {
     use super::*;
     use std::sync::Arc;

-    use mz_persist::location::SeqNo;
+    use mz_persist::location::{Blob, SeqNo};
+    use mz_persist_types::codec_impls::StringSchema;
     use timely::dataflow::operators::Leave;
     use timely::dataflow::operators::Probe;
     use timely::dataflow::operators::capture::{Capture, Event as CaptureEvent};
     use timely::dataflow::operators::probe::Handle as ProbeHandle;
     use timely::progress::Antichain;

+    use crate::batch::{INLINE_WRITES_SINGLE_MAX_BYTES, INLINE_WRITES_TOTAL_MAX_BYTES};
+    use crate::cache::PersistClientCache;
+    use crate::internal::paths::PartialBlobKey;
     use crate::operators::shard_source::shard_source;
-    use crate::{Diagnostics, ShardId};
+    use crate::{Diagnostics, PersistLocation, ShardId};

     #[mz_ore::test]
     fn test_lease_manager() {
@@ -1488,6 +1492,126 @@ mod tests {
         assert_eq!(frontier, Antichain::from_elem(1));
     }

+    /// Regression test for the `shard_source_fetch` freeze path: a blob that
+    /// goes missing while *fetching* (the listing path is covered by
+    /// `test_shard_source_error_freeze`) must freeze the output frontier at the
+    /// missing part, not keep draining later results — which would match them
+    /// to earlier capabilities and advance the frontier past data never emitted.
+    ///
+    /// With `as_of = 0` over three batches, the descs operator emits parts at
+    /// t=0 (snapshot + first listen), t=1, and t=2. Deleting the t=1 batch's
+    /// blob makes only its fetch fail: a frozen source keeps the frontier at 1,
+    /// while the bug drains the t=2 result against the t=1 capability, advancing
+    /// it to 2.
+    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
+    #[cfg_attr(miri, ignore)] // too slow
+    async fn test_shard_source_fetch_error_freeze() {
+        // Force writes to real blobs (inline parts have no blob to delete) and
+        // disable compaction so the three batches stay distinct and deletable.
+        let mut cache = PersistClientCache::new_no_metrics();
+        cache.cfg.compaction_enabled = false;
+        cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
+        cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
+        let persist_client = cache
+            .open(PersistLocation::new_in_mem())
+            .await
+            .expect("in-mem location is valid");
+        let shard_id = ShardId::new();
+        // Clones of a `PersistClient` share the blob `Arc`, so deleting via this
+        // handle is visible to the reader the source opens.
+        let blob = Arc::clone(&persist_client.blob);
+
+        let mut write = persist_client
+            .open_writer::<String, String, u64, u64>(
+                shard_id,
+                Arc::new(StringSchema),
+                Arc::new(StringSchema),
+                Diagnostics::for_tests(),
+            )
+            .await
+            .expect("invalid usage");
+
+        // The data-part (non-rollup) blob keys currently present.
+        async fn batch_keys(blob: &dyn Blob) -> std::collections::BTreeSet<String> {
+            let mut keys = std::collections::BTreeSet::new();
+            blob.list_keys_and_metadata("", &mut |meta| {
+                if let Ok((_, PartialBlobKey::Batch(..))) = BlobKey::parse_ids(meta.key) {
+                    keys.insert(meta.key.to_owned());
+                }
+            })
+            .await
+            .expect("list keys");
+            keys
+        }
+
+        let row = |t: u64| ((format!("k{t}"), format!("v{t}")), t, 1u64);
+        write.expect_compare_and_append(&[row(0)], 0, 1).await;
+        let before = batch_keys(blob.as_ref()).await;
+        write.expect_compare_and_append(&[row(1)], 1, 2).await;
+        let after = batch_keys(blob.as_ref()).await;
+        write.expect_compare_and_append(&[row(2)], 2, 3).await;
+
+        // Delete exactly the middle (t=1) batch's data part(s).
+        let missing: Vec<_> = after.difference(&before).cloned().collect();
+        assert!(!missing.is_empty(), "middle batch wrote no blob part");
+        for key in &missing {
+            blob.delete(key).await.expect("delete");
+        }
+
+        let frontier = timely::execute::execute_directly(move |worker| {
+            let errored = Rc::new(std::cell::Cell::new(false));
+            let error_handler = ErrorHandler::signal({
+                let errored = Rc::clone(&errored);
+                move |_err| errored.set(true)
+            });
+
+            let (probe, _token) = worker.dataflow::<u64, _, _>(|outer| {
+                let (stream, token) = outer.scoped::<u64, _, _>("hybrid", |scope| {
+                    let (stream, tokens) = shard_source::<String, String, u64, u64, _, _, _>(
+                        outer,
+                        scope,
+                        "test_source",
+                        move || std::future::ready(persist_client.clone()),
+                        shard_id,
+                        Some(Antichain::from_elem(0)),
+                        SnapshotMode::Include,
+                        Antichain::new(),
+                        Some(move |_, descs, _| (descs, vec![])),
+                        Arc::new(StringSchema),
+                        Arc::new(StringSchema),
+                        FilterResult::keep_all,
+                        false.then_some(|| unreachable!()),
+                        async {},
+                        error_handler,
+                    );
+                    (stream.leave(outer), tokens)
+                });
+                let probe = ProbeHandle::new();
+                stream.probe_with(&probe);
+                (probe, token)
+            });
+
+            // Step until the fetch error fires, then keep stepping: the source
+            // must stay frozen at the missing part and not advance onto the
+            // later, fetchable part.
+            let deadline = Instant::now() + std::time::Duration::from_secs(60);
+            while !errored.get() {
+                assert!(
+                    Instant::now() < deadline,
+                    "timed out waiting for fetch error"
+                );
+                worker.step();
+            }
+            for _ in 0..100 {
+                worker.step();
+            }
+            probe.with_frontier(|f| f.to_owned())
+        });
+
+        // Frozen at the missing part (t=1); the bug advanced this to 2.
+        assert_eq!(frontier, Antichain::from_elem(1));
+    }
+
     async fn initialize_shard(
         persist_client: &PersistClient,
         shard_id: ShardId,

Running cargo test -p mz-persist-client --lib source-deasync-design operators::shard_source::tests::test_shard_source_fetch_error_freeze -- --exact fails:

thread 'operators::shard_source::tests::test_shard_source_fetch_error_freeze' (106186) panicked at src/persist-client/src/operators/shard_source.rs:1612:9:
assertion `left == right` failed
  left: Antichain { elements: [2] }
 right: Antichain { elements: [1] }
stack backtrace:
   0: __rustc::rust_begin_unwind
   1: core::panicking::panic_fmt
   2: core::panicking::assert_failed_inner
   3: core::panicking::assert_failed::<timely::progress::frontier::Antichain<u64>, timely::progress::frontier::Antichain<u64>>
   4: mz_persist_client::operators::shard_source::tests::test_shard_source_fetch_error_freeze::{closure#0}::test_impl::{closure#0}
   5: mz_persist_client::operators::shard_source::tests::test_shard_source_fetch_error_freeze::{closure#0}
   6: <core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>> as core::future::future::Future>::poll
   7: <tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>> as core::future::future::Future>::poll
   8: <tokio::runtime::park::CachedParkThread>::block_on::<tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>>::{closure#0}
   9: <tokio::runtime::park::CachedParkThread>::block_on::<tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>>
  10: <tokio::runtime::context::blocking::BlockingRegionGuard>::block_on::<tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>>
  11: <tokio::runtime::scheduler::multi_thread::MultiThread>::block_on::<tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>>::{closure#0}
  12: tokio::runtime::context::runtime::enter_runtime::<<tokio::runtime::scheduler::multi_thread::MultiThread>::block_on<tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>>::{closure#0}, ()>
  13: <tokio::runtime::scheduler::multi_thread::MultiThread>::block_on::<tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>>
  14: <tokio::runtime::runtime::Runtime>::block_on_inner::<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>
  15: <tokio::runtime::runtime::Runtime>::block_on::<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>
  16: mz_persist_client::operators::shard_source::tests::test_shard_source_fetch_error_freeze
  17: mz_persist_client::operators::shard_source::tests::test_shard_source_fetch_error_freeze::{closure#0}
  18: <mz_persist_client::operators::shard_source::tests::test_shard_source_fetch_error_freeze::{closure#0} as core::ops::function::FnOnce<()>>::call_once
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
test operators::shard_source::tests::test_shard_source_fetch_error_freeze ... FAILED

Based on https://github.com/MaterializeInc/qa-llm-review/blob/master/commit-bugs/done/analysis-pr-36910.md

antiguru added a commit to antiguru/materialize that referenced this pull request Jun 9, 2026
On a fetch error the operator set `failed` and broke out of the drain
loop, but kept re-entering it on later activations. A subsequently
fetched part's result would then pop the *failed* part's capability off
the front of `inflight_caps` (FIFO) and emit at it, advancing the output
frontier past data that was never produced — defeating the freeze.

Return early once frozen so every outstanding capability is retained and
the frontier stays at the missing part. Adds a regression test that
deletes the first batch's blob and steps until the dataflow quiesces, so
the later parts' fetch results are produced and would be drained under
the bug; it fails deterministically without the fix.

Found by qa-llm-review on MaterializeInc#36910.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@antiguru antiguru force-pushed the persist-source-deasync-design branch from fb159d7 to 94467ce Compare June 9, 2026 11:23
@antiguru

antiguru commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

@def- thank you, fixed this issue.

@antiguru antiguru closed this Jun 15, 2026
@antiguru antiguru reopened this Jun 16, 2026
@antiguru antiguru force-pushed the persist-source-deasync-design branch from 94467ce to 02b1bad Compare June 16, 2026 08:57
@antiguru antiguru changed the title persist: deasync shard_source operators, bound non-cc fetches persist: deasync shard_source operators Jun 16, 2026
Comment on lines +987 to +989
let fetched_cap = cap.delayed(cap.time(), 0);
let completed_cap = cap.delayed(cap.time(), 1);
inflight_caps.push_back((fetched_cap, completed_cap));

@antiguru antiguru Jun 16, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implies a FIFO ordering between desc_tx and blob_rx, is this intended? A more robust implementation would manage a ChangeBatch/MutableAntichain that counts how many outstanding blobs it expected at concrete timestamps. We'd need to send concrete timestamps to the task so it can tell us at which cap to send a blob to the next operator.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — the FIFO coupling was sound only because the task is a strictly sequential loop, and it would misassign the moment the task fetched concurrently (releasing a part's completed-fetches capability, and its lease, for a part not actually fetched). Reworked it as you suggested in ffa9df5.

Each desc now carries the time it was minted at to the task, which echoes it back with the result. The operator tracks outstanding fetches in a per-time BTreeMap<TInner, (data_cap, completed_cap, count)>: emit each blob at its time's capability, and drop both capabilities when a time's count reaches zero. No dependence on result order, and it lines up with the descs-side LeaseManager, which is already keyed by time.

A nice side effect: the freeze-on-error is now structural rather than relying on the early-return. The failed time's capability is never decremented, so the frontier holds there no matter what other times complete — so the FIFO mis-pop class of bug (the earlier def- finding) can't recur. All 7 shard_source tests and the txn-wal stress test still pass.

antiguru and others added 4 commits June 16, 2026 13:27
Convert shard_source_descs and shard_source_fetch from
AsyncOperatorBuilder to synchronous OperatorBuilderRc operators paired
with tokio tasks that own the persist I/O (reader/snapshot/listen and
batch fetching). This drops the persist source's dependence on the
timely async bridge with no behavior change.

* shard_source_descs runs a listen task on the chosen worker that sends
  parts (split into ExchangeableBatchPart + Lease) and progress over a
  channel; the operator downgrades capabilities and parks leases. The
  former shard_source_descs_return operator is merged in as a
  disconnected completed_fetches input, and the listen handle (the
  reader's SeqNo hold) is released via a oneshot once that frontier
  empties.
* shard_source_fetch forwards descs to a fetch task and retains a
  per-flight capability pair; results are matched FIFO, emitted at the
  data capability, with the completed-fetches capability dropped to
  release the lease. On a missing blob it reports through the
  ErrorHandler and freezes, retaining capabilities (and crucially
  ceasing to drain results, so a later good result cannot advance the
  frontier past the missing part).
* Both operators reproduce builder_async's two-phase shutdown via
  build_reschedule + the coordinated button, so a local-only press
  cannot advance the downstream frontier past times other workers still
  feed.

Adds ErrorHandler::report_and_freeze, module documentation of the
consumer contract and the operator/task architecture, and regression
tests for end-to-end fetch, mid-stream shutdown, listing-path error
freeze, and fetch-path error freeze.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
With the persist source's listen polling moved into a tokio task, a
fully caught-up DataSubscribe with no listen retry timer produces no
worker activations. The stress test's read workers parked indefinitely
on step_or_park(None) while polling a oneshot, so they never observed
the shutdown signal. Bound the parks so they re-check between steps.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The lease-return input now lives on the shard_source_descs operator, so
introspection names that operator instead.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The fetch operator matched results to capabilities by arrival order: it
pushed a capability pair per desc into a VecDeque and popped the front
for each result. That was sound only because the fetch task is a strictly
sequential loop, and it would silently misassign capabilities the moment
the task fetched concurrently — releasing a part's completed-fetches
capability (and its lease) for a part not actually fetched.

Tag each desc with the time it was minted at, echo that time back with
the result, and track outstanding fetches in a per-time BTreeMap of
(data cap, completed cap, count). Emit each blob at its time's capability
and drop both capabilities when a time's count reaches zero. This is
independent of the order results return in, and it aligns with the
descs-side LeaseManager, which is also keyed by time. As a bonus the
freeze-on-error is now structural: the failed time's capability is never
decremented, so the frontier holds there regardless of what else drains.

Per review feedback on MaterializeInc#36910.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@antiguru antiguru force-pushed the persist-source-deasync-design branch from 02b1bad to ffa9df5 Compare June 16, 2026 11:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants