From ee080179b9df81de6bac81202f729ccb86192976 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 04:12:29 -0400 Subject: [PATCH 01/15] Remove commented code --- differential-dataflow/src/operators/reduce.rs | 35 ------------------- 1 file changed, 35 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 77834ab42..6433a3d33 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -479,18 +479,6 @@ mod history_replay { let mut meet = None; update_meet(&mut meet, self.meets.get(0)); update_meet(&mut meet, batch_replay.meet()); - // if let Some(time) = self.meets.get(0) { - // meet = match meet { - // None => Some(self.meets[0].clone()), - // Some(x) => Some(x.meet(&self.meets[0])), - // }; - // } - // if let Some(time) = batch_replay.meet() { - // meet = match meet { - // None => Some(time.clone()), - // Some(x) => Some(x.meet(&time)), - // }; - // } // Having determined the meet, we can load the input and output histories, where we // advance all times by joining them with `meet`. The resulting times are more compact @@ -643,24 +631,6 @@ mod history_replay { self.output_buffer.clear(); } - // output_replay.advance_buffer_by(&meet); - // for &((ref value, ref time), diff) in output_replay.buffer().iter() { - // if time.less_equal(&next_time) { - // self.output_buffer.push(((*value).clone(), -diff)); - // } - // else { - // self.temporary.push(next_time.join(time)); - // } - // } - // for &((ref value, ref time), diff) in self.output_produced.iter() { - // if time.less_equal(&next_time) { - // self.output_buffer.push(((*value).clone(), -diff)); - // } - // else { - // self.temporary.push(next_time.join(&time)); - // } - // } - // Having subtracted output updates from user output, consolidate the results to determine // if there is anything worth reporting. Note: this also orders the results by value, so // that could make the above merging plan even easier. @@ -754,12 +724,7 @@ mod history_replay { update_meet(&mut meet, input_replay.meet()); update_meet(&mut meet, output_replay.meet()); for time in self.synth_times.iter() { update_meet(&mut meet, Some(time)); } - // if let Some(time) = batch_replay.meet() { meet = meet.meet(time); } - // if let Some(time) = input_replay.meet() { meet = meet.meet(time); } - // if let Some(time) = output_replay.meet() { meet = meet.meet(time); } - // for time in self.synth_times.iter() { meet = meet.meet(time); } update_meet(&mut meet, meets_slice.first()); - // if let Some(time) = meets_slice.first() { meet = meet.meet(time); } // Update `times_current` by the frontier. if let Some(meet) = meet.as_ref() { From 1535c4b0e3499d165b3de12b996e6e6e44f948c5 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 04:14:57 -0400 Subject: [PATCH 02/15] Removed one-off trait --- differential-dataflow/src/operators/reduce.rs | 37 ++----------------- 1 file changed, 4 insertions(+), 33 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 6433a3d33..7e03139ff 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -347,35 +347,6 @@ fn sort_dedup(list: &mut Vec) { list.dedup(); } -trait PerKeyCompute<'a, C1, C2, C3, V> -where - C1: Cursor, - C2: for<'b> Cursor = C1::Key<'a>, ValOwn = V, Time = C1::Time>, - C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, - V: Clone + Ord, -{ - fn new() -> Self; - fn compute( - &mut self, - key: C1::Key<'a>, - source_cursor: (&mut C1, &'a C1::Storage), - output_cursor: (&mut C2, &'a C2::Storage), - batch_cursor: (&mut C3, &'a C3::Storage), - times: &mut Vec, - logic: &mut L, - upper_limit: &Antichain, - outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)], - new_interesting: &mut Vec) -> (usize, usize) - where - L: FnMut( - C1::Key<'a>, - &[(C1::Val<'a>, C1::Diff)], - &mut Vec<(V, C2::Diff)>, - &mut Vec<(V, C2::Diff)>, - ); -} - - /// Implementation based on replaying historical and new updates together. mod history_replay { @@ -386,7 +357,7 @@ mod history_replay { use crate::trace::Cursor; use crate::operators::ValueHistory; - use super::{PerKeyCompute, sort_dedup}; + use super::sort_dedup; /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in /// time order, maintaining consolidated representations of updates with respect to future interesting times. @@ -410,14 +381,14 @@ mod history_replay { temporary: Vec, } - impl<'a, C1, C2, C3, V> PerKeyCompute<'a, C1, C2, C3, V> for HistoryReplayer<'a, C1, C2, C3, V> + impl<'a, C1, C2, C3, V> HistoryReplayer<'a, C1, C2, C3, V> where C1: Cursor, C2: for<'b> Cursor = C1::Key<'a>, ValOwn = V, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, V: Clone + Ord, { - fn new() -> Self { + pub fn new() -> Self { HistoryReplayer { input_history: ValueHistory::new(), output_history: ValueHistory::new(), @@ -433,7 +404,7 @@ mod history_replay { } } #[inline(never)] - fn compute( + pub fn compute( &mut self, key: C1::Key<'a>, (source_cursor, source_storage): (&mut C1, &'a C1::Storage), From e6af4581d4e838167880867a3f3298478b1444df Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 04:16:04 -0400 Subject: [PATCH 03/15] Remove unread counters --- differential-dataflow/src/operators/reduce.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 7e03139ff..fdf7bbec3 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -221,7 +221,7 @@ where sort_dedup(&mut interesting_times); // do the per-key computation. - let _counters = thinker.compute( + thinker.compute( key, (&mut source_cursor, source_storage), (&mut output_cursor, output_storage), @@ -414,7 +414,7 @@ mod history_replay { logic: &mut L, upper_limit: &Antichain, outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)], - new_interesting: &mut Vec) -> (usize, usize) + new_interesting: &mut Vec) where L: FnMut( C1::Key<'a>, @@ -487,9 +487,6 @@ mod history_replay { let mut times_slice = ×[..]; let mut meets_slice = &self.meets[..]; - let mut compute_counter = 0; - let mut output_counter = 0; - // We have candidate times from `batch` and `times`, as well as times identified by either // `input` or `output`. Finally, we may have synthetic times produced as the join of times // we consider in the course of evaluation. As long as any of these times exist, we need to @@ -553,8 +550,6 @@ mod history_replay { // output produced. This sounds like a good test to have for debug builds! if interesting { - compute_counter += 1; - // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use). debug_assert!(self.input_buffer.is_empty()); meet.as_ref().map(|meet| input_replay.advance_buffer_by(meet)); @@ -613,8 +608,6 @@ mod history_replay { // times. if !self.update_buffer.is_empty() { - output_counter += 1; - // We *should* be able to find a capability for `next_time`. Any thing else would // indicate a logical error somewhere along the way; either we release a capability // we should have kept, or we have computed the output incorrectly (or both!) @@ -709,8 +702,6 @@ mod history_replay { // Normalize the representation of `new_interesting`, deduplicating and ordering. sort_dedup(new_interesting); - - (compute_counter, output_counter) } } From 8f9e929d9bb68cd5b9fda98344830428ae51c492 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 04:19:16 -0400 Subject: [PATCH 04/15] Convert silent errors to panics --- differential-dataflow/src/operators/reduce.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index fdf7bbec3..2b4fa29c2 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -83,8 +83,6 @@ where let mut output_upper = Antichain::from_elem(::minimum()); let mut output_lower = Antichain::from_elem(::minimum()); - let id = scope.index(); - move |(input, _frontier), output| { // The `reduce` operator receives fully formed batches, which each serve as an indication @@ -301,15 +299,8 @@ where // Update `capabilities` to reflect interesting pairs described by `frontier`. let mut new_capabilities = Vec::new(); for time in frontier.borrow().iter() { - if let Some(cap) = capabilities.iter().find(|c| c.time().less_equal(time)) { - new_capabilities.push(cap.delayed(time)); - } - else { - println!("{}:\tfailed to find capability less than new frontier time:", id); - println!("{}:\t time: {:?}", id, time); - println!("{}:\t caps: {:?}", id, capabilities); - println!("{}:\t uppr: {:?}", id, upper_limit); - } + let cap = capabilities.iter().find(|c| c.time().less_equal(time)).expect("failed to find capability"); + new_capabilities.push(cap.delayed(time)); } capabilities = new_capabilities; From 51aef09bcd12bda0c94526566c6af790d24601ce Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 04:27:13 -0400 Subject: [PATCH 05/15] Simplify logic --- differential-dataflow/src/operators/reduce.rs | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 2b4fa29c2..50b4d55ee 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -447,26 +447,18 @@ mod history_replay { // and guaranteed to accumulate identically for times greater or equal to `meet`. // Load the input and output histories. - let mut input_replay = if let Some(meet) = meet.as_ref() { - self.input_history.replay_key(source_cursor, source_storage, key, |time| { - let mut time = C1::owned_time(time); - time.join_assign(meet); - time - }) - } - else { - self.input_history.replay_key(source_cursor, source_storage, key, |time| C1::owned_time(time)) - }; - let mut output_replay = if let Some(meet) = meet.as_ref() { - self.output_history.replay_key(output_cursor, output_storage, key, |time| { - let mut time = C2::owned_time(time); - time.join_assign(meet); - time - }) - } - else { - self.output_history.replay_key(output_cursor, output_storage, key, |time| C2::owned_time(time)) - }; + let mut input_replay = + self.input_history.replay_key(source_cursor, source_storage, key, |time| { + let mut time = C1::owned_time(time); + if let Some(meet) = meet.as_ref() { time.join_assign(meet); } + time + }); + let mut output_replay = + self.output_history.replay_key(output_cursor, output_storage, key, |time| { + let mut time = C2::owned_time(time); + if let Some(meet) = meet.as_ref() { time.join_assign(meet); } + time + }); self.synth_times.clear(); self.times_current.clear(); From 3901fdb9b9928c9350b60e7e959c7928db7061c1 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 04:30:22 -0400 Subject: [PATCH 06/15] More commented code removed --- differential-dataflow/src/operators/reduce.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 50b4d55ee..596c63e53 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -60,11 +60,8 @@ where let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); - // let mut output_trace = TraceRc::make_from(agent).0; *result_trace = Some(output_reader.clone()); - // let mut thinker1 = history_replay_prior::HistoryReplayer::::new(); - // let mut thinker = history_replay::HistoryReplayer::::new(); let mut new_interesting_times = Vec::::new(); // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times, @@ -666,7 +663,7 @@ mod history_replay { // Update `meet` to track the meet of each source of times. - meet = None;//T::maximum(); + meet = None; update_meet(&mut meet, batch_replay.meet()); update_meet(&mut meet, input_replay.meet()); update_meet(&mut meet, output_replay.meet()); From b157359b286bec095611cfc5ba90033a58940cf3 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 04:30:56 -0400 Subject: [PATCH 07/15] Extract unconditional behavior --- differential-dataflow/src/operators/reduce.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 596c63e53..f9551a3a0 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -300,14 +300,11 @@ where new_capabilities.push(cap.delayed(time)); } capabilities = new_capabilities; - - // ensure that observed progress is reflected in the output. - output_writer.seal(upper_limit.clone()); - } - else { - output_writer.seal(upper_limit.clone()); } + // ensure that observed progress is reflected in the output. + output_writer.seal(upper_limit.clone()); + // We only anticipate future times in advance of `upper_limit`. source_trace.set_logical_compaction(upper_limit.borrow()); output_reader.set_logical_compaction(upper_limit.borrow()); From fadcd7b89b45040533ab007bc9a4a31d1817f715 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 04:35:13 -0400 Subject: [PATCH 08/15] Idiomatic Rust --- differential-dataflow/src/operators/reduce.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index f9551a3a0..23ad76094 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -529,7 +529,7 @@ mod history_replay { // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use). debug_assert!(self.input_buffer.is_empty()); - meet.as_ref().map(|meet| input_replay.advance_buffer_by(meet)); + if let Some(meet) = meet.as_ref() { input_replay.advance_buffer_by(meet) }; for &((value, ref time), ref diff) in input_replay.buffer().iter() { if time.less_equal(&next_time) { self.input_buffer.push((value, diff.clone())); @@ -548,7 +548,7 @@ mod history_replay { } crate::consolidation::consolidate(&mut self.input_buffer); - meet.as_ref().map(|meet| output_replay.advance_buffer_by(meet)); + if let Some(meet) = meet.as_ref() { output_replay.advance_buffer_by(meet) }; for &((value, ref time), ref diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { self.output_buffer.push((C2::owned_val(value), diff.clone())); From 2ec3293b148b4556637a9ab4e9c4ae4cd5f6bb62 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 04:53:49 -0400 Subject: [PATCH 09/15] Less time cloning --- differential-dataflow/src/operators/reduce.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 23ad76094..3715d2843 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -473,7 +473,7 @@ mod history_replay { input_replay.time(), output_replay.time(), self.synth_times.last(), - ].iter().cloned().flatten().min().cloned() { + ].into_iter().flatten().min().cloned() { // Advance input and output history replayers. This marks applicable updates as active. input_replay.step_while_time_is(&next_time); @@ -511,7 +511,7 @@ mod history_replay { // and become the time itself. They may not equal the current time because whatever frontier we // are tracking may not have advanced far enough. // TODO: `batch_history` may or may not be super compact at this point, and so this check might - // yield false positives if not sufficiently compact. Maybe we should into this and see. + // yield false positives if not sufficiently compact. Maybe we should look into this and see. interesting = interesting || batch_replay.buffer().iter().any(|&((_, ref t),_)| t.less_equal(&next_time)); interesting = interesting || self.times_current.iter().any(|t| t.less_equal(&next_time)); @@ -658,7 +658,6 @@ mod history_replay { debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time))) } - // Update `meet` to track the meet of each source of times. meet = None; update_meet(&mut meet, batch_replay.meet()); @@ -685,12 +684,8 @@ mod history_replay { /// Updates an optional meet by an optional time. fn update_meet(meet: &mut Option, other: Option<&T>) { if let Some(time) = other { - if let Some(meet) = meet.as_mut() { - *meet = meet.meet(time); - } - if meet.is_none() { - *meet = Some(time.clone()); - } + if let Some(meet) = meet.as_mut() { meet.meet_assign(time); } + else { *meet = Some(time.clone()); } } } } From 81533872e2d317ac8a89382c485d344106779085 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 05:09:47 -0400 Subject: [PATCH 10/15] Idiomatic capability use --- differential-dataflow/src/operators/reduce.rs | 24 ++++--------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 3715d2843..4e3695e8f 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -8,13 +8,11 @@ use timely::container::PushInto; use crate::Data; -use timely::order::PartialOrder; use timely::progress::frontier::Antichain; use timely::progress::Timestamp; use timely::dataflow::*; use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; -use timely::dataflow::operators::Capability; use crate::operators::arrange::{Arranged, TraceAgent}; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; @@ -67,7 +65,7 @@ where // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times, // as well as capabilities for these times (or their lower envelope, at least). let mut interesting = Vec::<(T1::KeyOwn, G::Timestamp)>::new(); - let mut capabilities = Vec::>::new(); + let mut capabilities = timely::dataflow::operators::CapabilitySet::::default(); // buffers and logic for computing per-key interesting times "efficiently". let mut interesting_times = Vec::::new(); @@ -119,10 +117,7 @@ where } // Ensure that `capabilities` covers the capability of the batch. - capabilities.retain(|cap| !capability.time().less_than(cap.time())); - if !capabilities.iter().any(|cap| cap.time().less_equal(capability.time())) { - capabilities.push(capability.retain(0)); - } + capabilities.insert(capability.retain(0)); }); // Pull in any subsequent empty batches we believe to exist. @@ -287,19 +282,10 @@ where // uses exactly `upper_limit` to determine the upper bound. Good to check though. assert!(output_upper.borrow() == upper_limit.borrow()); - // Determine the frontier of our interesting times. + // Update `capabilities` to reflect interesting times. let mut frontier = Antichain::::new(); - for (_, time) in &interesting { - frontier.insert_ref(time); - } - - // Update `capabilities` to reflect interesting pairs described by `frontier`. - let mut new_capabilities = Vec::new(); - for time in frontier.borrow().iter() { - let cap = capabilities.iter().find(|c| c.time().less_equal(time)).expect("failed to find capability"); - new_capabilities.push(cap.delayed(time)); - } - capabilities = new_capabilities; + for (_, time) in &interesting { frontier.insert_ref(time); } + capabilities.downgrade(frontier); } // ensure that observed progress is reflected in the output. From d0215402dfbf9af407c9ba01c46d934e5aa39362 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 05:19:13 -0400 Subject: [PATCH 11/15] Ref keyword to borrow --- differential-dataflow/src/operators/reduce.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 4e3695e8f..44f3727a1 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -168,12 +168,9 @@ where let mut buffer = Bu::Input::default(); // cursors for navigating input and output traces. - let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); - let source_storage = &source_storage; - let (mut output_cursor, output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor"); - let output_storage = &output_storage; - let (mut batch_cursor, batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage); - let batch_storage = &batch_storage; + let (mut source_cursor, ref source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); + let (mut output_cursor, ref output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor"); + let (mut batch_cursor, ref batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage); let mut thinker = history_replay::HistoryReplayer::new(); From 7a0283486c5e4d03eea6952127e93cefa2c9dfaa Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 05:53:51 -0400 Subject: [PATCH 12/15] Further tightening --- differential-dataflow/src/operators/reduce.rs | 118 +++++++----------- 1 file changed, 43 insertions(+), 75 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 44f3727a1..650feeb0d 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -469,11 +469,7 @@ mod history_replay { // Advance batch history, and capture whether an update exists at `next_time`. let mut interesting = batch_replay.step_while_time_is(&next_time); - if interesting { - if let Some(meet) = meet.as_ref() { - batch_replay.advance_buffer_by(meet); - } - } + if interesting { if let Some(meet) = meet.as_ref() { batch_replay.advance_buffer_by(meet); } } // advance both `synth_times` and `times_slice`, marking this time interesting if in either. while self.synth_times.last() == Some(&next_time) { @@ -513,81 +509,62 @@ mod history_replay { // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use). debug_assert!(self.input_buffer.is_empty()); if let Some(meet) = meet.as_ref() { input_replay.advance_buffer_by(meet) }; - for &((value, ref time), ref diff) in input_replay.buffer().iter() { - if time.less_equal(&next_time) { - self.input_buffer.push((value, diff.clone())); - } - else { - self.temporary.push(next_time.join(time)); - } + for ((value, time), diff) in input_replay.buffer().iter() { + if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); } + else { self.temporary.push(next_time.join(time)); } } - for &((value, ref time), ref diff) in batch_replay.buffer().iter() { - if time.less_equal(&next_time) { - self.input_buffer.push((value, diff.clone())); - } - else { - self.temporary.push(next_time.join(time)); - } + for ((value, time), diff) in batch_replay.buffer().iter() { + if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); } + else { self.temporary.push(next_time.join(time)); } } crate::consolidation::consolidate(&mut self.input_buffer); + // Assemble the output collection at `next_time`. (`self.output_buffer` cleared just after use). if let Some(meet) = meet.as_ref() { output_replay.advance_buffer_by(meet) }; - for &((value, ref time), ref diff) in output_replay.buffer().iter() { - if time.less_equal(&next_time) { - self.output_buffer.push((C2::owned_val(value), diff.clone())); - } - else { - self.temporary.push(next_time.join(time)); - } + for ((value, time), diff) in output_replay.buffer().iter() { + if time.less_equal(&next_time) { self.output_buffer.push((C2::owned_val(*value), diff.clone())); } + else { self.temporary.push(next_time.join(time)); } } - for &((ref value, ref time), ref diff) in self.output_produced.iter() { - if time.less_equal(&next_time) { - self.output_buffer.push(((*value).to_owned(), diff.clone())); - } - else { - self.temporary.push(next_time.join(time)); - } + for ((value, time), diff) in self.output_produced.iter() { + if time.less_equal(&next_time) { self.output_buffer.push(((*value).to_owned(), diff.clone())); } + else { self.temporary.push(next_time.join(time)); } } crate::consolidation::consolidate(&mut self.output_buffer); - // Apply user logic if non-empty input and see what happens! + // Apply user logic if non-empty input or output and see what happens! if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() { logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer); self.input_buffer.clear(); self.output_buffer.clear(); - } - - // Having subtracted output updates from user output, consolidate the results to determine - // if there is anything worth reporting. Note: this also orders the results by value, so - // that could make the above merging plan even easier. - crate::consolidation::consolidate(&mut self.update_buffer); - - // Stash produced updates into both capability-indexed buffers and `output_produced`. - // The two locations are important, in that we will compact `output_produced` as we move - // through times, but we cannot compact the output buffers because we need their actual - // times. - if !self.update_buffer.is_empty() { - - // We *should* be able to find a capability for `next_time`. Any thing else would - // indicate a logical error somewhere along the way; either we release a capability - // we should have kept, or we have computed the output incorrectly (or both!) - let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time)); - let idx = outputs.len() - idx.expect("failed to find index") - 1; - for (val, diff) in self.update_buffer.drain(..) { - self.output_produced.push(((val.clone(), next_time.clone()), diff.clone())); - outputs[idx].1.push((val, next_time.clone(), diff)); - } - // Advance times in `self.output_produced` and consolidate the representation. - // NOTE: We only do this when we add records; it could be that there are situations - // where we want to consolidate even without changes (because an initially - // large collection can now be collapsed). - if let Some(meet) = meet.as_ref() { - for entry in &mut self.output_produced { - (entry.0).1 = (entry.0).1.join(meet); + // Having subtracted output updates from user output, consolidate the results to determine + // if there is anything worth reporting. Note: this also orders the results by value, so + // that could make the above merging plan even easier. + // + // Stash produced updates into both capability-indexed buffers and `output_produced`. + // The two locations are important, in that we will compact `output_produced` as we move + // through times, but we cannot compact the output buffers because we need their actual + // times. + crate::consolidation::consolidate(&mut self.update_buffer); + if !self.update_buffer.is_empty() { + + // We *should* be able to find a capability for `next_time`. Any thing else would + // indicate a logical error somewhere along the way; either we release a capability + // we should have kept, or we have computed the output incorrectly (or both!) + let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time)); + let idx = outputs.len() - idx.expect("failed to find index") - 1; + for (val, diff) in self.update_buffer.drain(..) { + self.output_produced.push(((val.clone(), next_time.clone()), diff.clone())); + outputs[idx].1.push((val, next_time.clone(), diff)); } + + // Advance times in `self.output_produced` and consolidate the representation. + // NOTE: We only do this when we add records; it could be that there are situations + // where we want to consolidate even without changes (because an initially + // large collection can now be collapsed). + if let Some(meet) = meet.as_ref() { for entry in &mut self.output_produced { (entry.0).1.join_assign(meet); } } + crate::consolidation::consolidate(&mut self.output_produced); } - crate::consolidation::consolidate(&mut self.output_produced); } } @@ -601,17 +578,8 @@ mod history_replay { // Any time, even uninteresting times, must be joined with the current accumulation of // batch times as well as the current accumulation of `times_current`. - for &((_, ref time), _) in batch_replay.buffer().iter() { - if !time.less_equal(&next_time) { - self.temporary.push(time.join(&next_time)); - } - } - for time in self.times_current.iter() { - if !time.less_equal(&next_time) { - self.temporary.push(time.join(&next_time)); - } - } - + self.temporary.extend(batch_replay.buffer().iter().map(|((_,time),_)| time).filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time))); + self.temporary.extend(self.times_current.iter().filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time))); sort_dedup(&mut self.temporary); // Introduce synthetic times, and re-organize if we add any. From 8e75167dba1ca16d2c89c90b5836666d9d875d0c Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 06:18:31 -0400 Subject: [PATCH 13/15] Tidy explanatory text, and reflect frontier --- differential-dataflow/src/operators/reduce.rs | 37 +++++++------------ 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 650feeb0d..9e159e428 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -78,25 +78,15 @@ where let mut output_upper = Antichain::from_elem(::minimum()); let mut output_lower = Antichain::from_elem(::minimum()); - move |(input, _frontier), output| { + move |(input, frontier), output| { - // The `reduce` operator receives fully formed batches, which each serve as an indication - // that the frontier has advanced to the upper bound of their description. + // The operator receives input batches, which it treats as contiguous and will collect and + // then process as one batch. It captures the input frontier from the batches, from the upstream + // trace, and from the input frontier, and retires the work through that interval. // - // Although we could act on each individually, several may have been sent, and it makes - // sense to accumulate them first to coordinate their re-evaluation. We will need to pay - // attention to which times need to be collected under which capability, so that we can - // assemble output batches correctly. We will maintain several builders concurrently, and - // place output updates into the appropriate builder. - // - // It turns out we must use notificators, as we cannot await empty batches from arrange to - // indicate progress, as the arrange may not hold the capability to send such. Instead, we - // must watch for progress here (and the upper bound of received batches) to tell us how - // far we can process work. - // - // We really want to retire all batches we receive, so we want a frontier which reflects - // both information from batches as well as progress information. I think this means that - // we keep times that are greater than or equal to a time in the other frontier, deduplicated. + // Reduce may retain capabilities and need to perform work and produce output at times that + // may not be seen in its input. The standard example is that updates at `(0, 1)` and `(1, 0)` + // may result in outputs at `(1, 1)` as well, even with no input at that time. let mut batch_cursors = Vec::new(); let mut batch_storage = Vec::new(); @@ -105,23 +95,22 @@ where lower_limit.clear(); lower_limit.extend(upper_limit.borrow().iter().cloned()); - // Drain the input stream of batches, validating the contiguity of the batch descriptions and - // capturing a cursor for each of the batches as well as ensuring we hold a capability for the - // times in the batch. + // Drain input batches in order, capturing capabilities and the last upper. input.for_each(|capability, batches| { - + capabilities.insert(capability.retain(0)); for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); batch_cursors.push(batch.cursor()); batch_storage.push(batch); } - - // Ensure that `capabilities` covers the capability of the batch. - capabilities.insert(capability.retain(0)); }); // Pull in any subsequent empty batches we believe to exist. source_trace.advance_upper(&mut upper_limit); + // Incorporate the input frontier guarantees as well. + let mut joined = Antichain::new(); + crate::lattice::antichain_join_into(&upper_limit.borrow()[..], &frontier.frontier()[..], &mut joined); + upper_limit = joined; // Only if our upper limit has advanced should we do work. if upper_limit != lower_limit { From 7c053ade8bb2afe6183a95860d076e99296fa998 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 06:46:33 -0400 Subject: [PATCH 14/15] Tighten comments, remove mutable borrow --- differential-dataflow/src/operators/reduce.rs | 48 ++++++------------- 1 file changed, 14 insertions(+), 34 deletions(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 9e159e428..f6e5ad60b 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -112,18 +112,15 @@ where crate::lattice::antichain_join_into(&upper_limit.borrow()[..], &frontier.frontier()[..], &mut joined); upper_limit = joined; - // Only if our upper limit has advanced should we do work. + // We plan to retire the interval [lower_limit, upper_limit), which should be non-empty to proceed. if upper_limit != lower_limit { - // If we have no capabilities, then we (i) should not produce any outputs and (ii) could not send - // any produced outputs even if they were (incorrectly) produced. We cannot even send empty batches - // to indicate forward progress, and must hope that downstream operators look at progress frontiers - // as well as batch descriptions. - // - // We can (and should) advance source and output traces if `upper_limit` indicates this is possible. + // If we hold no capabilitys in the interval [lower_limit, upper_limit) then we have no compute needs, + // and could not transmit the outputs even if they were (incorrectly) non-zero. + // We do have maintenance work after this logic, and should not fuse this test with the above test. if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) { - // `interesting` contains "warnings" about keys and times that may need to be re-considered. + // `interesting` contains "todos" about key and time pairs that should be re-considered. // We first extract those times from this list that lie in the interval we will process. sort_dedup(&mut interesting); // `exposed` contains interesting (key, time)s now below `upper_limit` @@ -139,12 +136,6 @@ where }); // Prepare an output buffer and builder for each capability. - // - // We buffer and build separately, as outputs are produced grouped by time, whereas the - // builder wants to see outputs grouped by value. While the per-key computation could - // do the re-sorting itself, buffering per-key outputs lets us double check the results - // against other implementations for accuracy. - // // TODO: It would be better if all updates went into one batch, but timely dataflow prevents // this as long as it requires that there is only one capability for each message. let mut buffers = Vec::<(G::Timestamp, Vec<(T2::ValOwn, G::Timestamp, T2::Diff)>)>::new(); @@ -163,11 +154,7 @@ where let mut thinker = history_replay::HistoryReplayer::new(); - // We now march through the keys we must work on, drawing from `batch_cursors` and `exposed`. - // - // We only keep valid cursors (those with more data) in `batch_cursors`, and so its length - // indicates whether more data remain. We move through `exposed` using (index) `exposed_position`. - // There could perhaps be a less provocative variable name. + // March through the keys we must work on, merging `batch_cursors` and `exposed`. let mut exposed_position = 0; while batch_cursor.key_valid(batch_storage) || exposed_position < exposed_keys.len() { @@ -181,13 +168,9 @@ where (None, None) => unreachable!(), }; - // `interesting_times` contains those times between `lower_issued` and `upper_limit` - // that we need to re-consider. We now populate it, but perhaps this should be left - // to the per-key computation, which may be able to avoid examining the times of some - // values (for example, in the case of min/max/topk). + // Populate `interesting_times` with interesting times not beyond `upper_limit`. + // TODO: This could just be `exposed_time` and `lower .. upper` bounds. interesting_times.clear(); - - // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key. while exposed_keys.get(exposed_position) == Some(key) { interesting_times.push(T1::owned_time(exposed_time.index(exposed_position))); exposed_position += 1; @@ -202,22 +185,19 @@ where (&mut source_cursor, source_storage), (&mut output_cursor, output_storage), (&mut batch_cursor, batch_storage), - &mut interesting_times, + &interesting_times, &mut logic, &upper_limit, &mut buffers[..], &mut new_interesting_times, ); - if batch_cursor.get_key(batch_storage) == Some(key) { - batch_cursor.step_key(batch_storage); - } + // Advance the cursor if this key, so that the loop's validity check registers the work as done. + if batch_cursor.get_key(batch_storage) == Some(key) { batch_cursor.step_key(batch_storage); } // Record future warnings about interesting times (and assert they should be "future"). - for time in new_interesting_times.drain(..) { - debug_assert!(upper_limit.less_equal(&time)); - interesting.push((T1::owned_key(key), time)); - } + debug_assert!(new_interesting_times.iter().all(|t| upper_limit.less_equal(t))); + interesting.extend(new_interesting_times.drain(..).map(|t| (T1::owned_key(key), t))); // Sort each buffer by value and move into the corresponding builder. // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`, @@ -367,7 +347,7 @@ mod history_replay { (source_cursor, source_storage): (&mut C1, &'a C1::Storage), (output_cursor, output_storage): (&mut C2, &'a C2::Storage), (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage), - times: &mut Vec, + times: &Vec, logic: &mut L, upper_limit: &Antichain, outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)], From 5281266452aea5e6894dafc02c31409342aa6d6f Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 4 Apr 2026 07:00:05 -0400 Subject: [PATCH 15/15] Prefer insert_ref to insert --- differential-dataflow/src/operators/reduce.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index f6e5ad60b..b5b2aac4c 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -227,7 +227,7 @@ where output_upper.clear(); output_upper.extend(upper_limit.borrow().iter().cloned()); for capability in &capabilities[index + 1 ..] { - output_upper.insert(capability.time().clone()); + output_upper.insert_ref(capability.time()); } if output_upper.borrow() != output_lower.borrow() {