persist: deasync shard_source operators#36910
Conversation
| } | ||
|
|
||
| if failed { | ||
| return; |
There was a problem hiding this comment.
Do we still need to drain the msg_rx?
There was a problem hiding this comment.
No: every error path in the listen task sends Error as its final message and returns, so once the operator has observed an Error the channel is provably empty forever. Added a comment documenting the invariant (8f38bab).
There was a problem hiding this comment.
Good catch — this was a real bug, fixed in 94467ce (rebased onto current main).
On a fetch error the operator set failed and breaked out of the drain loop, but kept re-entering it on subsequent activations. A later, successfully-fetched part's Ok result would then pop the failed part's capability off the front of inflight_caps (FIFO) and emit at it, advancing the output frontier past the missing part — defeating the freeze. The fix returns early once frozen, retaining every outstanding capability.
I adapted your reproduction: it deletes the first batch's blob and steps until the dataflow quiesces (with brief parks so the tokio fetch task finishes), rather than a fixed iteration count — otherwise it races the task and only catches the bug ~13% of the time. It now fails deterministically without the fix and passes with it.
aec4880 to
8f38bab
Compare
def-
left a comment
There was a problem hiding this comment.
Claude found this issue:
diff --git a/src/persist-client/src/operators/shard_source.rs b/src/persist-client/src/operators/shard_source.rs
index bb4da7fae9..c861e66d8f 100644
--- a/src/persist-client/src/operators/shard_source.rs
+++ b/src/persist-client/src/operators/shard_source.rs
@@ -1055,15 +1055,19 @@ mod tests {
use super::*;
use std::sync::Arc;
- use mz_persist::location::SeqNo;
+ use mz_persist::location::{Blob, SeqNo};
+ use mz_persist_types::codec_impls::StringSchema;
use timely::dataflow::operators::Leave;
use timely::dataflow::operators::Probe;
use timely::dataflow::operators::capture::{Capture, Event as CaptureEvent};
use timely::dataflow::operators::probe::Handle as ProbeHandle;
use timely::progress::Antichain;
+ use crate::batch::{INLINE_WRITES_SINGLE_MAX_BYTES, INLINE_WRITES_TOTAL_MAX_BYTES};
+ use crate::cache::PersistClientCache;
+ use crate::internal::paths::PartialBlobKey;
use crate::operators::shard_source::shard_source;
- use crate::{Diagnostics, ShardId};
+ use crate::{Diagnostics, PersistLocation, ShardId};
#[mz_ore::test]
fn test_lease_manager() {
@@ -1488,6 +1492,126 @@ mod tests {
assert_eq!(frontier, Antichain::from_elem(1));
}
+ /// Regression test for the `shard_source_fetch` freeze path: a blob that
+ /// goes missing while *fetching* (the listing path is covered by
+ /// `test_shard_source_error_freeze`) must freeze the output frontier at the
+ /// missing part, not keep draining later results — which would match them
+ /// to earlier capabilities and advance the frontier past data never emitted.
+ ///
+ /// With `as_of = 0` over three batches, the descs operator emits parts at
+ /// t=0 (snapshot + first listen), t=1, and t=2. Deleting the t=1 batch's
+ /// blob makes only its fetch fail: a frozen source keeps the frontier at 1,
+ /// while the bug drains the t=2 result against the t=1 capability, advancing
+ /// it to 2.
+ #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
+ #[cfg_attr(miri, ignore)] // too slow
+ async fn test_shard_source_fetch_error_freeze() {
+ // Force writes to real blobs (inline parts have no blob to delete) and
+ // disable compaction so the three batches stay distinct and deletable.
+ let mut cache = PersistClientCache::new_no_metrics();
+ cache.cfg.compaction_enabled = false;
+ cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
+ cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
+ let persist_client = cache
+ .open(PersistLocation::new_in_mem())
+ .await
+ .expect("in-mem location is valid");
+ let shard_id = ShardId::new();
+ // Clones of a `PersistClient` share the blob `Arc`, so deleting via this
+ // handle is visible to the reader the source opens.
+ let blob = Arc::clone(&persist_client.blob);
+
+ let mut write = persist_client
+ .open_writer::<String, String, u64, u64>(
+ shard_id,
+ Arc::new(StringSchema),
+ Arc::new(StringSchema),
+ Diagnostics::for_tests(),
+ )
+ .await
+ .expect("invalid usage");
+
+ // The data-part (non-rollup) blob keys currently present.
+ async fn batch_keys(blob: &dyn Blob) -> std::collections::BTreeSet<String> {
+ let mut keys = std::collections::BTreeSet::new();
+ blob.list_keys_and_metadata("", &mut |meta| {
+ if let Ok((_, PartialBlobKey::Batch(..))) = BlobKey::parse_ids(meta.key) {
+ keys.insert(meta.key.to_owned());
+ }
+ })
+ .await
+ .expect("list keys");
+ keys
+ }
+
+ let row = |t: u64| ((format!("k{t}"), format!("v{t}")), t, 1u64);
+ write.expect_compare_and_append(&[row(0)], 0, 1).await;
+ let before = batch_keys(blob.as_ref()).await;
+ write.expect_compare_and_append(&[row(1)], 1, 2).await;
+ let after = batch_keys(blob.as_ref()).await;
+ write.expect_compare_and_append(&[row(2)], 2, 3).await;
+
+ // Delete exactly the middle (t=1) batch's data part(s).
+ let missing: Vec<_> = after.difference(&before).cloned().collect();
+ assert!(!missing.is_empty(), "middle batch wrote no blob part");
+ for key in &missing {
+ blob.delete(key).await.expect("delete");
+ }
+
+ let frontier = timely::execute::execute_directly(move |worker| {
+ let errored = Rc::new(std::cell::Cell::new(false));
+ let error_handler = ErrorHandler::signal({
+ let errored = Rc::clone(&errored);
+ move |_err| errored.set(true)
+ });
+
+ let (probe, _token) = worker.dataflow::<u64, _, _>(|outer| {
+ let (stream, token) = outer.scoped::<u64, _, _>("hybrid", |scope| {
+ let (stream, tokens) = shard_source::<String, String, u64, u64, _, _, _>(
+ outer,
+ scope,
+ "test_source",
+ move || std::future::ready(persist_client.clone()),
+ shard_id,
+ Some(Antichain::from_elem(0)),
+ SnapshotMode::Include,
+ Antichain::new(),
+ Some(move |_, descs, _| (descs, vec![])),
+ Arc::new(StringSchema),
+ Arc::new(StringSchema),
+ FilterResult::keep_all,
+ false.then_some(|| unreachable!()),
+ async {},
+ error_handler,
+ );
+ (stream.leave(outer), tokens)
+ });
+ let probe = ProbeHandle::new();
+ stream.probe_with(&probe);
+ (probe, token)
+ });
+
+ // Step until the fetch error fires, then keep stepping: the source
+ // must stay frozen at the missing part and not advance onto the
+ // later, fetchable part.
+ let deadline = Instant::now() + std::time::Duration::from_secs(60);
+ while !errored.get() {
+ assert!(
+ Instant::now() < deadline,
+ "timed out waiting for fetch error"
+ );
+ worker.step();
+ }
+ for _ in 0..100 {
+ worker.step();
+ }
+ probe.with_frontier(|f| f.to_owned())
+ });
+
+ // Frozen at the missing part (t=1); the bug advanced this to 2.
+ assert_eq!(frontier, Antichain::from_elem(1));
+ }
+
async fn initialize_shard(
persist_client: &PersistClient,
shard_id: ShardId,Running cargo test -p mz-persist-client --lib source-deasync-design operators::shard_source::tests::test_shard_source_fetch_error_freeze -- --exact fails:
thread 'operators::shard_source::tests::test_shard_source_fetch_error_freeze' (106186) panicked at src/persist-client/src/operators/shard_source.rs:1612:9:
assertion `left == right` failed
left: Antichain { elements: [2] }
right: Antichain { elements: [1] }
stack backtrace:
0: __rustc::rust_begin_unwind
1: core::panicking::panic_fmt
2: core::panicking::assert_failed_inner
3: core::panicking::assert_failed::<timely::progress::frontier::Antichain<u64>, timely::progress::frontier::Antichain<u64>>
4: mz_persist_client::operators::shard_source::tests::test_shard_source_fetch_error_freeze::{closure#0}::test_impl::{closure#0}
5: mz_persist_client::operators::shard_source::tests::test_shard_source_fetch_error_freeze::{closure#0}
6: <core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>> as core::future::future::Future>::poll
7: <tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>> as core::future::future::Future>::poll
8: <tokio::runtime::park::CachedParkThread>::block_on::<tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>>::{closure#0}
9: <tokio::runtime::park::CachedParkThread>::block_on::<tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>>
10: <tokio::runtime::context::blocking::BlockingRegionGuard>::block_on::<tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>>
11: <tokio::runtime::scheduler::multi_thread::MultiThread>::block_on::<tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>>::{closure#0}
12: tokio::runtime::context::runtime::enter_runtime::<<tokio::runtime::scheduler::multi_thread::MultiThread>::block_on<tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>>::{closure#0}, ()>
13: <tokio::runtime::scheduler::multi_thread::MultiThread>::block_on::<tracing::instrument::Instrumented<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>>
14: <tokio::runtime::runtime::Runtime>::block_on_inner::<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>
15: <tokio::runtime::runtime::Runtime>::block_on::<core::pin::Pin<&mut dyn core::future::future::Future<Output = ()>>>
16: mz_persist_client::operators::shard_source::tests::test_shard_source_fetch_error_freeze
17: mz_persist_client::operators::shard_source::tests::test_shard_source_fetch_error_freeze::{closure#0}
18: <mz_persist_client::operators::shard_source::tests::test_shard_source_fetch_error_freeze::{closure#0} as core::ops::function::FnOnce<()>>::call_once
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
test operators::shard_source::tests::test_shard_source_fetch_error_freeze ... FAILED
Based on https://github.com/MaterializeInc/qa-llm-review/blob/master/commit-bugs/done/analysis-pr-36910.md
On a fetch error the operator set `failed` and broke out of the drain loop, but kept re-entering it on later activations. A subsequently fetched part's result would then pop the *failed* part's capability off the front of `inflight_caps` (FIFO) and emit at it, advancing the output frontier past data that was never produced — defeating the freeze. Return early once frozen so every outstanding capability is retained and the frontier stays at the missing part. Adds a regression test that deletes the first batch's blob and steps until the dataflow quiesces, so the later parts' fetch results are produced and would be drained under the bug; it fails deterministically without the fix. Found by qa-llm-review on MaterializeInc#36910. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
fb159d7 to
94467ce
Compare
|
@def- thank you, fixed this issue. |
94467ce to
02b1bad
Compare
| let fetched_cap = cap.delayed(cap.time(), 0); | ||
| let completed_cap = cap.delayed(cap.time(), 1); | ||
| inflight_caps.push_back((fetched_cap, completed_cap)); |
There was a problem hiding this comment.
This implies a FIFO ordering between desc_tx and blob_rx, is this intended? A more robust implementation would manage a ChangeBatch/MutableAntichain that counts how many outstanding blobs it expected at concrete timestamps. We'd need to send concrete timestamps to the task so it can tell us at which cap to send a blob to the next operator.
There was a problem hiding this comment.
Agreed — the FIFO coupling was sound only because the task is a strictly sequential loop, and it would misassign the moment the task fetched concurrently (releasing a part's completed-fetches capability, and its lease, for a part not actually fetched). Reworked it as you suggested in ffa9df5.
Each desc now carries the time it was minted at to the task, which echoes it back with the result. The operator tracks outstanding fetches in a per-time BTreeMap<TInner, (data_cap, completed_cap, count)>: emit each blob at its time's capability, and drop both capabilities when a time's count reaches zero. No dependence on result order, and it lines up with the descs-side LeaseManager, which is already keyed by time.
A nice side effect: the freeze-on-error is now structural rather than relying on the early-return. The failed time's capability is never decremented, so the frontier holds there no matter what other times complete — so the FIFO mis-pop class of bug (the earlier def- finding) can't recur. All 7 shard_source tests and the txn-wal stress test still pass.
Convert shard_source_descs and shard_source_fetch from AsyncOperatorBuilder to synchronous OperatorBuilderRc operators paired with tokio tasks that own the persist I/O (reader/snapshot/listen and batch fetching). This drops the persist source's dependence on the timely async bridge with no behavior change. * shard_source_descs runs a listen task on the chosen worker that sends parts (split into ExchangeableBatchPart + Lease) and progress over a channel; the operator downgrades capabilities and parks leases. The former shard_source_descs_return operator is merged in as a disconnected completed_fetches input, and the listen handle (the reader's SeqNo hold) is released via a oneshot once that frontier empties. * shard_source_fetch forwards descs to a fetch task and retains a per-flight capability pair; results are matched FIFO, emitted at the data capability, with the completed-fetches capability dropped to release the lease. On a missing blob it reports through the ErrorHandler and freezes, retaining capabilities (and crucially ceasing to drain results, so a later good result cannot advance the frontier past the missing part). * Both operators reproduce builder_async's two-phase shutdown via build_reschedule + the coordinated button, so a local-only press cannot advance the downstream frontier past times other workers still feed. Adds ErrorHandler::report_and_freeze, module documentation of the consumer contract and the operator/task architecture, and regression tests for end-to-end fetch, mid-stream shutdown, listing-path error freeze, and fetch-path error freeze. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
With the persist source's listen polling moved into a tokio task, a fully caught-up DataSubscribe with no listen retry timer produces no worker activations. The stress test's read workers parked indefinitely on step_or_park(None) while polling a oneshot, so they never observed the shutdown signal. Bound the parks so they re-check between steps. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The lease-return input now lives on the shard_source_descs operator, so introspection names that operator instead. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The fetch operator matched results to capabilities by arrival order: it pushed a capability pair per desc into a VecDeque and popped the front for each result. That was sound only because the fetch task is a strictly sequential loop, and it would silently misassign capabilities the moment the task fetched concurrently — releasing a part's completed-fetches capability (and its lease) for a part not actually fetched. Tag each desc with the time it was minted at, echo that time back with the result, and track outstanding fetches in a per-time BTreeMap of (data cap, completed cap, count). Emit each blob at its time's capability and drop both capabilities when a time's count reaches zero. This is independent of the order results return in, and it aligns with the descs-side LeaseManager, which is also keyed by time. As a bonus the freeze-on-error is now structural: the failed time's capability is never decremented, so the frontier holds there regardless of what else drains. Per review feedback on MaterializeInc#36910. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
02b1bad to
ffa9df5
Compare
Motivation
The persist source operators are built on the timely async bridge, which we are migrating away from (cf. #36810). This converts them to synchronous operators with no behavior change.
(This PR previously also changed the fetch-semaphore gating; that has been dropped to keep it a single-purpose deasync. The memory/backpressure work is tracked separately.)
Description
Convert
shard_source_descsandshard_source_fetchfromAsyncOperatorBuilderto synchronousOperatorBuilderRcoperators, each paired with a tokio task that owns the persist I/O.shard_source_descsruns a listen task on the chosen worker that owns the reader, snapshot, listen loop, and stats filtering, and sends parts (split intoExchangeableBatchPart+Lease) plus progress over a channel. The formershard_source_descs_returnoperator is merged in as a disconnectedcompleted_fetchesinput; the listen handle (the reader's SeqNo hold) is released via a oneshot once that frontier empties, preserving the old lease lifetime.shard_source_fetchforwards descs to a fetch task and retains a per-flight capability pair; results are matched FIFO, emitted at the data capability, with the completed-fetches capability dropped to release the lease only after the download completes. On a missing blob it reports through theErrorHandlerand freezes — retaining capabilities and ceasing to drain results, so a later good result cannot advance the frontier past the missing part.builder_async's two-phase shutdown (build_reschedule+ the coordinated button), so a local-only press cannot advance the downstream frontier past times other workers' instances still feed.filter_fn,listen_sleep, andstart_signalgainSendbounds; all callers already satisfy them.Verification
New
shard_sourceunit tests: end-to-end fetch, mid-stream shutdown, listing-path error freeze, and fetch-path error freeze (deterministic via quiesce; confirmed to fail without the freeze fix). Existing persist-client, txn-wal (incl.stress_correctness), and therelations.sltintrospection test pass locally.🤖 Generated with Claude Code