Skip to content

Commit b3a0d48

Browse files
committed
Less quadratic half_join
1 parent d50868e commit b3a0d48

2 files changed

Lines changed: 387 additions & 1 deletion

File tree

Lines changed: 384 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,384 @@
1+
/// Streaming asymmetric join between updates (K,V1) and an arrangement on (K,V2).
2+
///
3+
/// The asymmetry is that the join only responds to streamed updates, not to changes in the arrangement.
4+
/// Streamed updates join only with matching arranged updates at lesser times *in the total order*, and
5+
/// subject to a predicate supplied by the user (roughly: strictly less, or not).
6+
///
7+
/// This behavior can ensure that any pair of matching updates interact exactly once.
8+
///
9+
/// There are various forms of this operator with tangled closures about how to emit the outputs and
10+
/// wrangle the logical compaction frontier in order to preserve the distinctions around times that are
11+
/// strictly less (conventional compaction logic would collapse unequal times to the frontier, and lose
12+
/// the distiction).
13+
///
14+
/// The methods also carry an auxiliary time next to the value, which is used to advance the joined times.
15+
/// This is .. a byproduct of wanting to allow advancing times a la `join_function`, without breaking the
16+
/// coupling by total order on "initial time".
17+
///
18+
/// The doccomments for individual methods are a bit of a mess. Sorry.
19+
20+
use std::collections::VecDeque;
21+
use std::ops::Mul;
22+
23+
use timely::ContainerBuilder;
24+
use timely::container::CapacityContainerBuilder;
25+
use timely::dataflow::{Scope, ScopeParent, Stream};
26+
use timely::dataflow::channels::pact::{Pipeline, Exchange};
27+
use timely::dataflow::operators::{Capability, Operator, generic::Session};
28+
use timely::PartialOrder;
29+
use timely::progress::{Antichain, ChangeBatch, Timestamp};
30+
use timely::progress::frontier::MutableAntichain;
31+
32+
use differential_dataflow::{ExchangeData, VecCollection, AsCollection, Hashable};
33+
use differential_dataflow::difference::{Monoid, Semigroup};
34+
use differential_dataflow::lattice::Lattice;
35+
use differential_dataflow::operators::arrange::Arranged;
36+
use differential_dataflow::trace::{Cursor, TraceReader};
37+
use differential_dataflow::consolidation::{consolidate, consolidate_updates};
38+
use differential_dataflow::trace::implementations::BatchContainer;
39+
40+
use timely::dataflow::operators::CapabilitySet;
41+
42+
/// A binary equijoin that responds to updates on only its first input.
43+
///
44+
/// This operator responds to inputs of the form
45+
///
46+
/// ```ignore
47+
/// ((key, val1, time1), initial_time, diff1)
48+
/// ```
49+
///
50+
/// where `initial_time` is less or equal to `time1`, and produces as output
51+
///
52+
/// ```ignore
53+
/// ((output_func(key, val1, val2), lub(time1, time2)), initial_time, diff1 * diff2)
54+
/// ```
55+
///
56+
/// for each `((key, val2), time2, diff2)` present in `arrangement`, where
57+
/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*.
58+
/// This last constraint is important to ensure that we correctly produce
59+
/// all pairs of output updates across multiple `half_join` operators.
60+
///
61+
/// Notice that the time is hoisted up into data. The expectation is that
62+
/// once out of the "delta flow region", the updates will be `delay`d to the
63+
/// times specified in the payloads.
64+
pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
65+
stream: VecCollection<G, (K, V, G::Timestamp), R>,
66+
arrangement: Arranged<G, Tr>,
67+
frontier_func: FF,
68+
comparison: CF,
69+
mut output_func: S,
70+
) -> VecCollection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output>
71+
where
72+
G: Scope<Timestamp = Tr::Time>,
73+
K: Hashable + ExchangeData,
74+
V: ExchangeData,
75+
R: ExchangeData + Monoid,
76+
Tr: TraceReader<KeyOwn = K, Time: std::hash::Hash>+Clone+'static,
77+
R: Mul<Tr::Diff, Output: Semigroup>,
78+
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
79+
CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
80+
DOut: Clone+'static,
81+
S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static,
82+
{
83+
let output_func = move |session: &mut SessionFor<G, _>, k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, diff1: &R, output: &mut Vec<(G::Timestamp, Tr::Diff)>| {
84+
for (time, diff2) in output.drain(..) {
85+
let diff = diff1.clone() * diff2.clone();
86+
let dout = (output_func(k, v1, v2), time.clone());
87+
session.give((dout, initial.clone(), diff));
88+
}
89+
};
90+
half_join_internal_unsafe::<_, _, _, _, _, _,_,_,_, CapacityContainerBuilder<Vec<_>>>(stream, arrangement, frontier_func, comparison, |_timer, _count| false, output_func)
91+
.as_collection()
92+
}
93+
94+
/// A session with lifetime `'a` in a scope `G` with a container builder `CB`.
95+
///
96+
/// This is a shorthand primarily for the reson of readability.
97+
type SessionFor<'a, 'b, G, CB> =
98+
Session<'a, 'b,
99+
<G as ScopeParent>::Timestamp,
100+
CB,
101+
Capability<<G as ScopeParent>::Timestamp>,
102+
>;
103+
104+
/// An unsafe variant of `half_join` where the `output_func` closure takes
105+
/// additional arguments a vector of `time` and `diff` tuples as input and
106+
/// writes its outputs at a container builder. The container builder
107+
/// can, but isn't required to, accept `(data, time, diff)` triplets.
108+
/// This allows for more flexibility, but is more error-prone.
109+
///
110+
/// This operator responds to inputs of the form
111+
///
112+
/// ```ignore
113+
/// ((key, val1, time1), initial_time, diff1)
114+
/// ```
115+
///
116+
/// where `initial_time` is less or equal to `time1`, and produces as output
117+
///
118+
/// ```ignore
119+
/// output_func(session, key, val1, val2, initial_time, diff1, &[lub(time1, time2), diff2])
120+
/// ```
121+
///
122+
/// for each `((key, val2), time2, diff2)` present in `arrangement`, where
123+
/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*.
124+
///
125+
/// The `yield_function` allows the caller to indicate when the operator should
126+
/// yield control, as a function of the elapsed time and the number of matched
127+
/// records. Note this is not the number of *output* records, owing mainly to
128+
/// the number of matched records being easiest to record with low overhead.
129+
pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, Y, S, CB>(
130+
stream: VecCollection<G, (K, V, G::Timestamp), R>,
131+
mut arrangement: Arranged<G, Tr>,
132+
frontier_func: FF,
133+
comparison: CF,
134+
yield_function: Y,
135+
mut output_func: S,
136+
) -> Stream<G, CB::Container>
137+
where
138+
G: Scope<Timestamp = Tr::Time>,
139+
K: Hashable + ExchangeData,
140+
V: ExchangeData,
141+
R: ExchangeData + Monoid,
142+
Tr: for<'a> TraceReader<KeyOwn = K, Time: std::hash::Hash>+Clone+'static,
143+
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
144+
CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
145+
Y: Fn(std::time::Instant, usize) -> bool + 'static,
146+
S: FnMut(&mut SessionFor<G, CB>, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static,
147+
CB: ContainerBuilder,
148+
{
149+
// No need to block physical merging for this operator.
150+
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
151+
let mut arrangement_trace = Some(arrangement.trace);
152+
let arrangement_stream = arrangement.stream;
153+
154+
let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());
155+
156+
// Stash for (time, diff) accumulation.
157+
let mut output_buffer = Vec::new();
158+
159+
// Stage 1: Stuck blobs sorted by (initial, data). Nibbled from the front
160+
// as the arrangement frontier advances to determine eligible records.
161+
let mut stuck: Vec<StuckBlob<(K, V, G::Timestamp), G::Timestamp, R>> = Vec::new();
162+
// Stage 2: Ready blobs sorted by (data, initial). Consumed from the front
163+
// one record at a time, yield-safe.
164+
let mut ready: Vec<ReadyBlob<(K, V, G::Timestamp), G::Timestamp, R>> = Vec::new();
165+
// Buffer for new arrivals, stored as (initial, data, diff) for direct consolidation.
166+
let mut arriving: Vec<(G::Timestamp, (K, V, G::Timestamp), R)> = Vec::new();
167+
168+
let scope = stream.scope();
169+
stream.inner.binary_frontier(arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| {
170+
171+
// Acquire an activator to reschedule the operator when it has unfinished work.
172+
let activator = scope.activator_for(info.address);
173+
174+
move |(input1, frontier1), (input2, frontier2), output| {
175+
176+
// Drain all input for this activation into a single buffer.
177+
let mut caps = CapabilitySet::new();
178+
input1.for_each(|capability, data| {
179+
caps.insert(capability.retain(0));
180+
arriving.extend(data.drain(..).map(|(d, t, r)| (t, d, r)));
181+
});
182+
183+
// Form a new blob from this activation's arrivals.
184+
if !arriving.is_empty() {
185+
consolidate_updates(&mut arriving);
186+
187+
if !arriving.is_empty() {
188+
let mut lower = MutableAntichain::new();
189+
// TODO: consider implementing `update_iter_ref` to avoid clone.
190+
lower.update_iter(arriving.iter().map(|(t, _, _)| (t.clone(), 1)));
191+
192+
caps.downgrade(&lower.frontier());
193+
194+
stuck.push(StuckBlob {
195+
caps,
196+
lower,
197+
data: std::mem::take(&mut arriving).into(),
198+
});
199+
}
200+
}
201+
202+
// Drain input batches; although we do not observe them, we want access to the input
203+
// to observe the frontier and to drive scheduling.
204+
input2.for_each(|_, _| { });
205+
206+
// Local variables to track if and when we should exit early.
207+
let mut yielded = false;
208+
let timer = std::time::Instant::now();
209+
let mut work = 0;
210+
211+
if let Some(ref mut trace) = arrangement_trace {
212+
213+
let frontier = frontier2.frontier();
214+
215+
// Stage 2 first: drain ready piles (key-sorted, yield-safe).
216+
for pile in ready.iter_mut() {
217+
if yielded { break; }
218+
219+
let (mut cursor, storage) = trace.cursor();
220+
let mut key_con = Tr::KeyContainer::with_capacity(1);
221+
let mut removals: ChangeBatch<G::Timestamp> = ChangeBatch::new();
222+
// Cache a delayed capability. Reusable for any `initial` that is
223+
// greater or equal to the cached time in the partial order.
224+
let mut cached_cap: Option<Capability<G::Timestamp>> = None;
225+
226+
while let Some(((ref key, ref val1, ref time), ref initial, ref diff1)) = pile.data.front() {
227+
yielded = yielded || yield_function(timer, work);
228+
if yielded { break; }
229+
230+
// Reuse cached capability if its time is <= initial.
231+
if !cached_cap.as_ref().is_some_and(|cap| cap.time().less_equal(initial)) {
232+
cached_cap = Some(pile.caps.delayed(initial));
233+
}
234+
let cap = cached_cap.as_ref().unwrap();
235+
236+
key_con.clear(); key_con.push_own(&key);
237+
cursor.seek_key(&storage, key_con.index(0));
238+
if cursor.get_key(&storage) == key_con.get(0) {
239+
while let Some(val2) = cursor.get_val(&storage) {
240+
cursor.map_times(&storage, |t, d| {
241+
if comparison(t, initial) {
242+
let mut t = Tr::owned_time(t);
243+
t.join_assign(time);
244+
output_buffer.push((t, Tr::owned_diff(d)))
245+
}
246+
});
247+
consolidate(&mut output_buffer);
248+
work += output_buffer.len();
249+
// TODO: Worry about how to avoid reconstructing sessions so often.
250+
output_func(&mut output.session_with_builder(&cap), key, val1, val2, initial, diff1, &mut output_buffer);
251+
output_buffer.clear();
252+
cursor.step_val(&storage);
253+
}
254+
cursor.rewind_vals(&storage);
255+
}
256+
257+
let (_, initial, _) = pile.data.pop_front().unwrap();
258+
removals.update(initial, -1);
259+
}
260+
261+
// Apply all removals in bulk and downgrade once.
262+
if !removals.is_empty() {
263+
pile.lower.update_iter(removals.drain());
264+
pile.caps.downgrade(&pile.lower.frontier());
265+
}
266+
}
267+
ready.retain(|pile| !pile.data.is_empty());
268+
269+
// Stage 1: nibble blobs to produce new ready piles.
270+
if !yielded {
271+
// Put the total-order minimum of the frontier into a TimeContainer
272+
// so we can call `comparison`. Since `comparison` is monotone with
273+
// the total order, only the minimum matters.
274+
let mut time_con = Tr::TimeContainer::with_capacity(1);
275+
if let Some(min_time) = frontier.iter().min() {
276+
time_con.push_own(min_time);
277+
}
278+
279+
// Collect all eligible records across all blobs into one pile.
280+
let mut eligible = Vec::new();
281+
let mut eligible_caps = CapabilitySet::new();
282+
283+
for blob in stuck.iter_mut() {
284+
// Pop eligible records from the front. The deque is sorted by
285+
// initial time in total order, and `comparison` is monotone,
286+
// so we stop at the first ineligible record.
287+
let before = eligible.len();
288+
while let Some((initial, _, _)) = blob.data.front() {
289+
if (0..time_con.len()).any(|i| comparison(time_con.index(i), initial)) {
290+
break;
291+
}
292+
eligible.push(blob.data.pop_front().unwrap());
293+
}
294+
295+
if eligible.len() > before {
296+
// Grab caps for the eligible records before downgrading the blob.
297+
let mut frontier = Antichain::new();
298+
for (initial, _, _) in eligible[before..].iter() {
299+
frontier.insert(initial.clone());
300+
}
301+
for time in frontier.iter() {
302+
eligible_caps.insert(blob.caps.delayed(time));
303+
}
304+
305+
// Remove eligible times from the blob's antichain and downgrade.
306+
blob.lower.update_iter(eligible[before..].iter().map(|(t, _, _)| (t.clone(), -1)));
307+
blob.caps.downgrade(&blob.lower.frontier());
308+
}
309+
}
310+
311+
stuck.retain(|blob| !blob.data.is_empty());
312+
313+
if !eligible.is_empty() {
314+
// Rearrange to (data, initial, diff) and consolidate.
315+
// consolidate_updates sorts by (data, initial) which is
316+
// the order we want for the active blob.
317+
let mut active_data: Vec<_> = eligible.into_iter()
318+
.map(|(t, d, r)| (d, t, r))
319+
.collect();
320+
consolidate_updates(&mut active_data);
321+
322+
if !active_data.is_empty() {
323+
let mut pile_lower = MutableAntichain::new();
324+
pile_lower.update_iter(active_data.iter().map(|(_, t, _)| (t.clone(), 1)));
325+
eligible_caps.downgrade(&pile_lower.frontier());
326+
327+
ready.push(ReadyBlob {
328+
caps: eligible_caps,
329+
lower: pile_lower,
330+
data: VecDeque::from(active_data),
331+
});
332+
}
333+
}
334+
}
335+
}
336+
337+
// Re-activate if we have ready piles to process.
338+
if !ready.is_empty() {
339+
activator.activate();
340+
}
341+
342+
// The logical merging frontier depends on input1, stuck, and ready blobs.
343+
let mut frontier = Antichain::new();
344+
for time in frontier1.frontier().iter() {
345+
frontier_func(time, &mut frontier);
346+
}
347+
for blob in stuck.iter() {
348+
for cap in blob.caps.iter() {
349+
frontier_func(cap.time(), &mut frontier);
350+
}
351+
}
352+
for blob in ready.iter() {
353+
for cap in blob.caps.iter() {
354+
frontier_func(cap.time(), &mut frontier);
355+
}
356+
}
357+
arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));
358+
359+
if frontier1.is_empty() && stuck.is_empty() && ready.is_empty() {
360+
arrangement_trace = None;
361+
}
362+
}
363+
})
364+
}
365+
366+
/// Stuck work sorted by `(initial, data)`. Nibbled from the front as the
367+
/// arrangement frontier advances to determine eligible records.
368+
struct StuckBlob<D, T: Timestamp, R> {
369+
caps: CapabilitySet<T>,
370+
lower: MutableAntichain<T>,
371+
data: VecDeque<(T, D, R)>,
372+
}
373+
374+
/// Ready work sorted by `(data, initial)`. Consumed from the front one record
375+
/// at a time during trace lookups. Yield-safe: can stop and resume at any point,
376+
/// because the work can be resumed without re-sorting. Strictly speaking we will
377+
/// do a MutableAntichain rebuild, which could be linear time, but fixing that is
378+
/// future work.
379+
// TODO: Fix the thing in the comments.
380+
struct ReadyBlob<D, T: Timestamp, R> {
381+
caps: CapabilitySet<T>,
382+
lower: MutableAntichain<T>,
383+
data: VecDeque<(D, T, R)>,
384+
}

0 commit comments

Comments
 (0)