diff --git a/interactive/examples/common/ddir_expl_lib.rs b/interactive/examples/common/ddir_expl_lib.rs new file mode 100644 index 000000000..45038a9d7 --- /dev/null +++ b/interactive/examples/common/ddir_expl_lib.rs @@ -0,0 +1,591 @@ +//! Shared DD IR explanation rendering. Included by both ddir_expl.rs (bench) +//! and reach_explain.rs (demo) via `#[path = "common/ddir_expl_lib.rs"] mod expl;`. +//! +//! Lives under `examples/common/` (not `interactive/src/`) on purpose: this is +//! still a spike. Promotion to `interactive::explain` is gated on: +//! - SCC (and the rest of the program suite) producing correct, terminating +//! explanations — currently SCC hangs. +//! - Generalizing the hardcoded `Row = SmallVec<[i64; 2]>` and +//! `DdirTime = Product>` in line with whatever IR +//! parameterization the rest of `interactive` settles on. +//! - Replacing the `pack_pair` / `pack_triple` length-prefix-of-i64 hack +//! with a real encoding. +//! - Replacing `panic!(...)` shape assertions with typed IR invariants +//! (or returned errors) and routing `Inspect`'s `eprintln!` through +//! whatever logging story `interactive` adopts. +//! - Factoring the renderer so forward and forward+reverse share one pass +//! with a mode flag, rather than duplicating `lower::render_program`. + +use std::collections::HashMap; +use timely::order::Product; +use timely::dataflow::Scope; +use differential_dataflow::VecCollection; +use differential_dataflow::operators::iterate::VecVariable; +use differential_dataflow::dynamic::pointstamp::{PointStamp, PointStampSummary}; +use differential_dataflow::dynamic::feedback_summary; +use differential_dataflow::trace::implementations::ValSpine; +use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; +use smallvec::SmallVec; +use smallvec::smallvec as svec; + +use interactive::parse; +use interactive::ir::{Node, LinearOp, Program, Diff, Id, Time, eval_fields, eval_field_into, eval_condition}; + +/// Lift forward times into data: emit `((d, t_inner), t, r)` where `t_inner` is the +/// PointStamp iteration vector with the outer "round of input" coordinate integrated +/// out (set to 0). Causal-time tagging in dep should depend on derivation-iteration +/// depth, not on which input round delivered the row — so two rows that appear at the +/// same iteration in different rounds get the same lifted time, allowing the reducer +/// to consolidate them. +pub fn lift_times<'scope, D>(coll: VecCollection<'scope, DdirTime, D, Diff>) + -> VecCollection<'scope, DdirTime, (D, DdirTime), Diff> +where + D: differential_dataflow::Data, +{ + use timely::dataflow::channels::pact::Pipeline; + use differential_dataflow::AsCollection; + use timely::dataflow::operators::generic::operator::Operator; + coll.inner.unary(Pipeline, "lift_times", move |_,_| move |input, output| { + input.for_each(|cap, data| { + let mut session = output.session(&cap); + for (d, t, r) in data.iter() { + let t_inner_only = Product::new(0u64, t.inner.clone()); + session.give(((d.clone(), t_inner_only), t.clone(), *r)); + } + }); + }).as_collection() +} + +/// For each key in `coll`, compute the earliest forward time at which it appeared +/// AND is currently present (cumulative multiplicity > 0). Returns an arrangement +/// keyed by `K`, value `T` (the first time, or absent if currently retracted). +pub fn first_time_arr<'scope, K>(coll: VecCollection<'scope, DdirTime, K, Diff>) + -> Arranged<'scope, TraceAgent>> +where + K: differential_dataflow::ExchangeData + differential_dataflow::Hashable, +{ + let lifted = lift_times(coll); + lifted.arrange_by_key() + .reduce_abelian::<_, differential_dataflow::trace::implementations::ValBuilder<_,_,_,_>, ValSpine, _>( + "MinTime", + |_k, vals, out| { + // vals are (lifted_time, multiplicity) per key; lifted_time is the time of + // a delta. A row is currently present iff sum of multiplicities > 0. The + // first-time is the min lifted_time among positive contributors. + let total: Diff = vals.iter().map(|(_, m)| *m).sum(); + if total > 0 { + if let Some(min_t) = vals.iter().filter(|(_, m)| *m > 0).map(|(t, _)| (*t).clone()).min() { + out.push((min_t, 1)); + } + } + }, + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v, t, r)| ((key.clone(), v), t, r))); }, + ) +} + +pub type Row = SmallVec<[i64; 2]>; +pub type DdirTime = Product>; +pub type Col<'scope, T> = VecCollection<'scope, T, (Row, Row), Diff>; +pub type Arr<'scope, T> = Arranged<'scope, TraceAgent>>; +/// Tagged depends element: (node Id, key, val, causal_time). +/// `causal_time` is the upstream-imposed bound: this dep entry must be witnessed +/// by inputs at forward time ≤ `causal_time`. Pass-through ops preserve it; +/// witness-picking ops (Linear/Join/Distinct) filter by it and re-tag with the +/// chosen witness time; multi-source ops (Concat) restrict per-branch by it. +pub type DepRow = (Id, Row, Row, DdirTime); +pub type DepCol<'scope, T> = VecCollection<'scope, T, DepRow, Diff>; + +pub enum Rendered<'scope, T: timely::progress::Timestamp + differential_dataflow::lattice::Lattice> { + Collection(Col<'scope, T>), + Arrangement(Arr<'scope, T>), +} + +impl<'scope, T: timely::progress::Timestamp + differential_dataflow::lattice::Lattice> Rendered<'scope, T> { + fn collection(&self) -> Col<'scope, T> { + match self { + Rendered::Collection(c) => c.clone(), + Rendered::Arrangement(a) => a.clone().as_collection(|k, v| (k.clone(), v.clone())), + } + } + fn arrange(&self) -> Arr<'scope, T> { + match self { + Rendered::Arrangement(a) => a.clone(), + Rendered::Collection(c) => c.clone().arrange_by_key(), + } + } +} + +/// Per-scope-level depends machinery. Contributions are concat'd and bound +/// to `var` at scope exit. `dep_col` is the depends collection visible to +/// rules inside this scope (it cycles through `var`). +pub struct DepScope<'scope, T: timely::progress::Timestamp + differential_dataflow::lattice::Lattice> { + var: VecVariable<'scope, T, DepRow, Diff>, + dep_col: DepCol<'scope, T>, + contribs: Vec>, +} + +/// `coord` is the PointStamp coordinate index (0-based) this dep scope perturbs. +pub fn new_dep_scope<'scope>( + scope: Scope<'scope, DdirTime>, + coord: usize, +) -> DepScope<'scope, DdirTime> { + let step: Product> = Product::new(0, feedback_summary::(coord + 1, 1)); + let (var, dep_col) = VecVariable::::new(scope, step); + DepScope { var, dep_col, contribs: Vec::new() } +} + +pub fn render_program<'scope>( + program: &Program, + scope: Scope<'scope, DdirTime>, + inputs: &[Col<'scope, DdirTime>], + queries: &Col<'scope, DdirTime>, + query_id: Id, +) -> (HashMap>, DepCol<'scope, DdirTime>) +{ + let mut nodes: HashMap> = HashMap::new(); + let mut level: usize = 0; + let mut variables: HashMap, usize)> = HashMap::new(); + + // Single dep VecVariable at PointStamp coord 0 holds all depends across all IR nodes. + // User-program Variables use coord >= 1 (no conflict). Cycles in the depends graph use + // this single feedback. + let mut dep_scope = new_dep_scope(scope.clone(), 0); + + // Per-IR-node depth (currently informational). + let mut node_depth: HashMap = HashMap::new(); + + // Seed: deferred until after the loop, since we need nodes[&query_id] rendered to + // compute the first-time tag. + + for (&id, node) in program.nodes.iter() { + node_depth.insert(id, level); + match node { + Node::Input(i) => { + nodes.insert(id, Rendered::Collection(inputs[*i].clone())); + // No reverse rule needed; depends-at-Input is the explanation surfaced to user. + }, + Node::Linear { input, ops } => { + let c = nodes[input].collection(); + let ops_fwd = ops.clone(); + let lvl = level; + // Forward. + let r = c.clone().join_function(move |(key, val)| { + use timely::progress::Timestamp; + let mut results: smallvec::SmallVec<[((Row, Row), Time, Diff); 2]> = svec![((key, val), Time::minimum(), 1)]; + for op in &ops_fwd { + let mut next = smallvec::SmallVec::new(); + for ((k, v), t, d) in results { + match op { + LinearOp::Project(proj) => { + let i = [k.as_slice(), v.as_slice()]; + next.push(((eval_fields(&proj.key, &i), eval_fields(&proj.val, &i)), t, d)); + }, + LinearOp::Filter(cond) => { + let i = [k.as_slice(), v.as_slice()]; + if eval_condition(cond, &i) { next.push(((k, v), t, d)); } + }, + LinearOp::Negate => { next.push(((k, v), t, -d)); }, + LinearOp::EnterAt(field) => { + let delay = { + let mut r = Row::new(); + eval_field_into(field, &[k.as_slice(), v.as_slice()], &mut r); + 256 * (64 - (r.as_slice().first().copied().unwrap_or(0) as u64).leading_zeros() as u64) + }; + let mut coords = smallvec::SmallVec::<[u64; 1]>::new(); + for _ in 0..lvl.saturating_sub(1) { coords.push(0); } + coords.push(delay); + next.push(((k, v), Product::new(0u64, PointStamp::new(coords)), d)); + }, + } + } + results = next; + } + results + }); + nodes.insert(id, Rendered::Collection(r.clone())); + + // Reverse: build witness ((k_out, v_out), (k_in, v_in)) by re-executing chain. + let ops_rev = ops.clone(); + let witness = c.clone().flat_map(move |(k_in, v_in)| { + use timely::progress::Timestamp as _; + let key0 = k_in.clone(); + let val0 = v_in.clone(); + let mut results: smallvec::SmallVec<[(Row, Row); 2]> = svec![(k_in, v_in)]; + for op in &ops_rev { + let mut next: smallvec::SmallVec<[(Row, Row); 2]> = smallvec::SmallVec::new(); + for (k, v) in results { + match op { + LinearOp::Project(proj) => { + let i = [k.as_slice(), v.as_slice()]; + next.push((eval_fields(&proj.key, &i), eval_fields(&proj.val, &i))); + }, + LinearOp::Filter(cond) => { + let i = [k.as_slice(), v.as_slice()]; + if eval_condition(cond, &i) { next.push((k, v)); } + }, + LinearOp::Negate => { next.push((k, v)); }, + LinearOp::EnterAt(_) => { next.push((k, v)); }, + } + } + results = next; + } + let _ = Time::minimum(); + let key0 = key0; let val0 = val0; + results.into_iter().map(move |(ko, vo)| ((ko, vo), (key0.clone(), val0.clone()))) + }); + // Time-pruned reverse: + // 1. Lift forward output, reduce to first-occurrence time per (k_out, v_out). + // 2. Lift witness with time; join with first-time, keep only witnesses + // whose time is ≤ first-time of the output (causally preceding). + // 3. Reduce filtered witnesses to ONE per output (lex-min as tiebreaker). + // 4. Use as the witness arrangement for the dep-driven join. + // Time-tagged reverse: dep at this id carries causal-time bound T_dep. + // Pick FIRST (min t_w) witness with t_w ≤ T_dep, push (input_id, k_in, v_in, t_w). + use timely::order::PartialOrder; + let _ = r; + let witness_keyed = witness.map(|((ko, vo), (ki, vi))| (pack_pair(&ko, &vo), pack_pair(&ki, &vi))); + let witness_lifted_arr = lift_times(witness_keyed) + .map(|((kout, kin), t_w)| (kout, (kin, t_w))) + .arrange_by_key(); + let dep = dep_scope.dep_col.clone(); + let id_for_filter = id; + let dep_here = dep.filter(move |(i, _, _, _)| *i == id_for_filter) + .map(|(_, k, v, t_dep)| (pack_pair(&k, &v), t_dep)); + let dep_arr = dep_here.arrange_by_key(); + let input_id = *input; + let candidates = dep_arr.join_core(witness_lifted_arr, |kout, t_dep, kin_tw| { + let (kin, t_w) = kin_tw; + if PartialOrder::less_equal(t_w, t_dep) { + Some(((kout.clone(), t_dep.clone()), (kin.clone(), t_w.clone()))) + } else { None } + }); + let chosen_arr = candidates.arrange_by_key() + .reduce_abelian::<_, differential_dataflow::trace::implementations::ValBuilder<_,_,_,_>, ValSpine<(Row, DdirTime), (Row, DdirTime), DdirTime, Diff>, _>( + "PickLinearWitness", + |_k, vals, out| { + if let Some((kt, _)) = vals.iter().min_by_key(|(kt, _)| (kt.1.clone(), kt.0.clone())) { + out.push(((*kt).clone(), 1)); + } + }, + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v, t, r)| ((key.clone(), v), t, r))); }, + ); + let pushed = chosen_arr.as_collection(move |_k, kin_tw| { + let (kin_packed, t_w) = kin_tw.clone(); + let (ki, vi) = unpack_pair(&kin_packed); + (input_id, ki, vi, t_w) + }); + dep_scope.contribs.push(pushed); + }, + Node::Concat(ids) => { + let mut r = nodes[&ids[0]].collection(); + for i in &ids[1..] { r = r.concat(nodes[i].collection()); } + nodes.insert(id, Rendered::Collection(r)); + // Time-tagged reverse: for each branch b, push (b, k, v, t_b_first) only if + // b's first-time of (k, v) is ≤ T_dep. The pushed tag is b's actual contribution + // time — a tighter bound than T_dep for downstream rules. + use timely::order::PartialOrder; + let dep = dep_scope.dep_col.clone(); + let id_for_filter = id; + let dep_here_arr = dep.filter(move |(i, _, _, _)| *i == id_for_filter) + .map(|(_, k, v, t_dep)| ((k, v), t_dep)) + .arrange_by_key(); + for &b in ids { + let branch_first = first_time_arr(nodes[&b].collection()); + let pushed = dep_here_arr.clone().join_core(branch_first, move |kv, t_dep, t_b| { + if PartialOrder::less_equal(t_b, t_dep) { + Some((b, kv.0.clone(), kv.1.clone(), t_b.clone())) + } else { None } + }); + dep_scope.contribs.push(pushed); + } + }, + Node::Arrange(input) => { + nodes.insert(id, Rendered::Arrangement(nodes[input].arrange())); + // Reverse: pass through. + let dep = dep_scope.dep_col.clone(); + let id_for_filter = id; + let input_id = *input; + let pushed = dep.filter(move |(i, _, _, _)| *i == id_for_filter) + .map(move |(_, k, v, t)| (input_id, k, v, t)); + dep_scope.contribs.push(pushed); + }, + Node::Join { left, right, projection } => { + let Rendered::Arrangement(l) = &nodes[left] else { panic!("Join: left must be Arrangement") }; + let Rendered::Arrangement(r) = &nodes[right] else { panic!("Join: right must be Arrangement") }; + let l = l.clone(); + let r = r.clone(); + let proj_fwd = projection.clone(); + // Forward. + let result = l.clone().join_core(r.clone(), move |k, v1: &Row, v2: &Row| { + let i = [k.as_slice(), v1.as_slice(), v2.as_slice()]; + Some::<((Row, Row), _, _)>(((eval_fields(&proj_fwd.key, &i), eval_fields(&proj_fwd.val, &i)), (), 1)).into_iter().map(|((kk,vv),_,_)| (kk, vv)) + }); + nodes.insert(id, Rendered::Collection(result.clone())); + + // Reverse: build witness ((k_out, v_out), (key, lv, rv)) from forward join. + // We re-do the join but keep input parts. + let proj_rev = projection.clone(); + let witness = l.join_core(r, move |k, v1: &Row, v2: &Row| { + let i = [k.as_slice(), v1.as_slice(), v2.as_slice()]; + let kk: Row = eval_fields(&proj_rev.key, &i); + let vv: Row = eval_fields(&proj_rev.val, &i); + let kn: Row = k.clone(); + let lv: Row = v1.clone(); + let rv: Row = v2.clone(); + Some::<((Row, Row, Row, Row, Row), _, _)>(((kk, vv, kn, lv, rv), (), 1)).into_iter().map(|((a,b,c,d,e),_,_)| (pack_pair(&a, &b), pack_triple(&c, &d, &e))) + }); + // Time-tagged reverse: pick FIRST (min t_w) witness with t_w ≤ T_dep, push + // (left_id, kn, lv, t_w) and (right_id, kn, rv, t_w). + use timely::order::PartialOrder; + let _ = result; + let witness_lifted_arr = lift_times(witness) + .map(|((kout, kin), t_w)| (kout, (kin, t_w))) + .arrange_by_key(); + let dep = dep_scope.dep_col.clone(); + let id_for_filter = id; + let dep_here = dep.filter(move |(i, _, _, _)| *i == id_for_filter) + .map(|(_, k, v, t_dep)| (pack_pair(&k, &v), t_dep)); + let dep_arr = dep_here.arrange_by_key(); + let left_id = *left; + let right_id = *right; + let candidates = dep_arr.join_core(witness_lifted_arr, |kout, t_dep, kin_tw| { + let (packed_in, t_w) = kin_tw; + if PartialOrder::less_equal(t_w, t_dep) { + Some(((kout.clone(), t_dep.clone()), (packed_in.clone(), t_w.clone()))) + } else { None } + }); + let chosen_arr = candidates.arrange_by_key() + .reduce_abelian::<_, differential_dataflow::trace::implementations::ValBuilder<_,_,_,_>, ValSpine<(Row, DdirTime), (Row, DdirTime), DdirTime, Diff>, _>( + "PickJoinWitness", + |_k, vals, out| { + if let Some((kt, _)) = vals.iter().min_by_key(|(kt, _)| (kt.1.clone(), kt.0.clone())) { + out.push(((*kt).clone(), 1)); + } + }, + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v, t, r)| ((key.clone(), v), t, r))); }, + ); + let pushed_lr = chosen_arr.as_collection(move |_k, packed_tw| packed_tw.clone()) + .flat_map(move |(packed_in, t_w)| { + let (kn, lv, rv) = unpack_triple(&packed_in); + let l_row: DepRow = (left_id, kn.clone(), lv, t_w.clone()); + let r_row: DepRow = (right_id, kn, rv, t_w); + std::iter::once(l_row).chain(std::iter::once(r_row)) + }); + dep_scope.contribs.push(pushed_lr); + }, + Node::Reduce { input, reducer } => { + let Rendered::Arrangement(a) = &nodes[input] else { panic!("Reduce: input must be Arrangement") }; + let a = a.clone(); + let reducer_fwd = reducer.clone(); + use std::sync::Arc; + let f: Arc) + Send + Sync> = match reducer_fwd { + parse::Reducer::Min => Arc::new(|_key, vals, output| { if let Some(min) = vals.iter().map(|(v, _)| *v).min() { output.push((min.clone(), 1)); } }), + parse::Reducer::Distinct => Arc::new(|_key, _vals, output| { output.push((Row::new(), 1)); }), + parse::Reducer::Count => Arc::new(|_key, vals, output| { let count: Diff = vals.iter().map(|(_, d)| *d).sum(); if count > 0 { let mut r = Row::new(); r.push(count); output.push((r, 1)); } }), + }; + let reduced = a.clone().reduce_abelian::<_, differential_dataflow::trace::implementations::ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>, _>( + "Reduce", + move |k, v, o| f(k, v, o), + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + ); + nodes.insert(id, Rendered::Arrangement(reduced)); + + // Reverse rules differ per reducer: + // Distinct: pick ONE input row per dep key (lex-min v) — sufficient witness. + // Min: the dep's val IS an input val at that key — emit it directly. + // Count: needs all contributing rows (count depends on multiplicity). + let dep = dep_scope.dep_col.clone(); + let id_for_filter = id; + let input_id = *input; + use timely::order::PartialOrder; + match reducer { + parse::Reducer::Distinct => { + // Time-tagged: pick FIRST input (k, v) (min t_in) at key k with t_in ≤ T_dep. + let dep_kt = dep.filter(move |(i, _, _, _)| *i == id_for_filter) + .map(|(_, k, _, t_dep)| (k, t_dep)); + let dep_kt_arr = dep_kt.arrange_by_key(); + let input_lifted_arr = lift_times(a.as_collection(|k, v| (k.clone(), v.clone()))) + .map(|((k, v), t)| (k, (v, t))) + .arrange_by_key(); + let candidates = dep_kt_arr.join_core(input_lifted_arr, |k, t_dep, v_t_in| { + let (v, t_in) = v_t_in; + if PartialOrder::less_equal(t_in, t_dep) { + Some(((k.clone(), t_dep.clone()), (v.clone(), t_in.clone()))) + } else { None } + }); + let chosen_arr = candidates.arrange_by_key() + .reduce_abelian::<_, differential_dataflow::trace::implementations::ValBuilder<_,_,_,_>, ValSpine<(Row, DdirTime), (Row, DdirTime), DdirTime, Diff>, _>( + "DistinctWitnessTime", + |_k, vals, out| { + if let Some((v_t, _)) = vals.iter().min_by_key(|(v_t, _)| (v_t.1.clone(), v_t.0.clone())) { + out.push(((*v_t).clone(), 1)); + } + }, + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v, t, r)| ((key.clone(), v), t, r))); }, + ); + let pushed = chosen_arr.as_collection(move |k_t, v_t| { + let (k, _t_dep) = k_t.clone(); + let (v, t_in) = v_t.clone(); + (input_id, k, v, t_in) + }); + dep_scope.contribs.push(pushed); + }, + parse::Reducer::Min => { + // dep at id is (k, min_v); the explanation just needs one input (k, min_v). + // We emit (input_id, k, min_v) directly — this row exists in the input + // because the forward Min output IS an input value. + let pushed = dep.filter(move |(i, _, _, _)| *i == id_for_filter) + .map(move |(_, k, v, t)| (input_id, k, v, t)); + dep_scope.contribs.push(pushed); + }, + parse::Reducer::Count => { + // Count requires every input (k, v) at queried key with t_in ≤ T_dep. + let dep_kt = dep.filter(move |(i, _, _, _)| *i == id_for_filter) + .map(|(_, k, _, t_dep)| (k, t_dep)); + let dep_kt_arr = dep_kt.arrange_by_key(); + let input_lifted_arr = lift_times(a.as_collection(|k, v| (k.clone(), v.clone()))) + .map(|((k, v), t)| (k, (v, t))) + .arrange_by_key(); + let pushed = dep_kt_arr.join_core(input_lifted_arr, move |k, t_dep, v_t_in| { + let (v, t_in) = v_t_in; + if PartialOrder::less_equal(t_in, t_dep) { + Some((input_id, k.clone(), v.clone(), t_in.clone())) + } else { None } + }); + dep_scope.contribs.push(pushed); + }, + } + }, + Node::Variable => { + // User Variable at user-depth `level` (>=1) uses coord `level`. + // (coord 0 is reserved for the dep VecVariable.) + let coord = level; + let step: Product> = Product::new(0, feedback_summary::(coord + 1, 1)); + let (var, col) = VecVariable::new(scope.clone(), step); + nodes.insert(id, Rendered::Collection(col)); + variables.insert(id, (var, level)); + // Reverse rules attach via the eventual Bind. No contribution here. + }, + Node::Inspect { input, label } => { + let col = nodes[input].collection(); + let label = label.clone(); + nodes.insert(id, Rendered::Collection(col.inspect(move |x| eprintln!(" [{}] {:?}", label, x.clone())))); + // Pass-through. + let dep = dep_scope.dep_col.clone(); + let id_for_filter = id; + let input_id = *input; + let pushed = dep.filter(move |(i, _, _, _)| *i == id_for_filter) + .map(move |(_, k, v, t)| (input_id, k, v, t)); + dep_scope.contribs.push(pushed); + }, + Node::Leave(inner_id, scope_level) => { + nodes.insert(id, Rendered::Collection(nodes[inner_id].collection().leave_dynamic(*scope_level))); + // Reverse: depends-at-id (in outer dep scope) propagates to depends-at-inner_id + // in the inner dep scope (a different VecVariable in the same timely scope). + let outer_dep = dep_scope.dep_col.clone(); + let id_for_filter = id; + let inner_id_for_map = *inner_id; + let pushed = outer_dep.filter(move |(i, _, _, _)| *i == id_for_filter) + .map(move |(_, k, v, t)| (inner_id_for_map, k, v, t)); + let _ = scope_level; + dep_scope.contribs.push(pushed); + }, + Node::Scope => { + level += 1; + }, + Node::EndScope => { + level -= 1; + }, + Node::Bind { variable, value } => { + let c = nodes[value].collection(); + let (var, _) = variables.remove(variable).expect("Bind: variable not found"); + var.set(c); + // Reverse: depends-at-variable propagates to depends-at-value (and vice versa, + // since they're aliased in the forward fixpoint). + let dep = dep_scope.dep_col.clone(); + let var_id = *variable; + let val_id = *value; + let v_to_var = dep.clone().filter(move |(i, _, _, _)| *i == val_id).map(move |(_, k, v, t)| (var_id, k, v, t)); + let var_to_v = dep.filter(move |(i, _, _, _)| *i == var_id).map(move |(_, k, v, t)| (val_id, k, v, t)); + dep_scope.contribs.push(v_to_var); + dep_scope.contribs.push(var_to_v); + }, + } + } + + // Deferred seed: tag query rows with first-time of the queried node's collection. + // For correctness, query_id should refer to a node whose collection lives at the relevant + // INNER iterative time (not after leave_dynamic), so first-time reflects propagation iterations. + { + let qid = query_id; + let qcoll = nodes.get(&qid).expect("query_id must reference a rendered node").collection(); + let qfirst = first_time_arr(qcoll); + let q_arr = queries.clone().map(|kv| (kv, ())).arrange_by_key(); + let seeded = q_arr.join_core(qfirst, move |kv, _, t_first| { + Some((qid, kv.0.clone(), kv.1.clone(), t_first.clone())) + }); + dep_scope.contribs.push(seeded); + } + + // Finalize the dep scope: concat contribs, distinct, bind the VecVariable. + // distinct (via arrange_by_self + reduce_abelian) bounds multiplicities to {0,1}; + // contributions arrive via multiple paths, so plain consolidate would diverge. + let DepScope { var, dep_col, contribs } = dep_scope; + let outer_dep_col = dep_col.clone(); + let combined = contribs.into_iter().fold(dep_col.filter(|_| false), |acc, c| acc.concat(c)); + let combined_keyed = combined.map(|d| (d, ())); + let combined_arr = combined_keyed.arrange_by_key(); + let distinct: DepCol<'scope, DdirTime> = combined_arr.reduce_abelian::<_, differential_dataflow::trace::implementations::ValBuilder<_,_,_,_>, ValSpine, _>( + "DepDistinct", + |_key, _vals, output| { output.push(((), 1)); }, + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v, t, r)| ((key.clone(), v), t, r))); }, + ).as_collection(|k, _| k.clone()); + var.set(distinct); + + let collections: HashMap> = + nodes.into_iter().filter_map(|(id, r)| match r { Rendered::Collection(c) => Some((id, c)), _ => None }).collect(); + (collections, outer_dep_col) +} + +/// Pack two rows into a single row with a separator: [a..., -1, b...]. +/// (Negative len marker; values don't include negative numbers in our test programs. +/// For the spike this is fine; replace with proper encoding later.) +pub fn pack_pair(a: &Row, b: &Row) -> Row { + let mut r = Row::new(); + r.push(a.len() as i64); + for &x in a.iter() { r.push(x); } + for &x in b.iter() { r.push(x); } + r +} + +pub fn unpack_pair(r: &Row) -> (Row, Row) { + let n = r[0] as usize; + let mut a = Row::new(); + let mut b = Row::new(); + for &x in &r[1..1+n] { a.push(x); } + for &x in &r[1+n..] { b.push(x); } + (a, b) +} + +pub fn pack_triple(a: &Row, b: &Row, c: &Row) -> Row { + let mut r = Row::new(); + r.push(a.len() as i64); + r.push(b.len() as i64); + for &x in a.iter() { r.push(x); } + for &x in b.iter() { r.push(x); } + for &x in c.iter() { r.push(x); } + r +} + +pub fn unpack_triple(r: &Row) -> (Row, Row, Row) { + let na = r[0] as usize; + let nb = r[1] as usize; + let mut a = Row::new(); + let mut b = Row::new(); + let mut c = Row::new(); + let base = 2; + for &x in &r[base..base+na] { a.push(x); } + for &x in &r[base+na..base+na+nb] { b.push(x); } + for &x in &r[base+na+nb..] { c.push(x); } + (a, b, c) +} diff --git a/interactive/examples/ddir_expl.rs b/interactive/examples/ddir_expl.rs new file mode 100644 index 000000000..cd7d014fb --- /dev/null +++ b/interactive/examples/ddir_expl.rs @@ -0,0 +1,112 @@ +//! DD IR explanation backend (spike). Bench driver: gen_row inputs, query + +//! advance time + dump dep collection. + +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +#[path = "common/ddir_expl_lib.rs"] +mod expl; + +use differential_dataflow::dynamic::pointstamp::PointStamp; +use differential_dataflow::input::Input; + +use interactive::parse; +use interactive::ir::{Program, Diff, Id}; +use expl::{Row, render_program}; + +fn run(name: &str, stmts: Vec, n_inputs: usize, nodes: u64, edges: u64, arity: usize, query_id: Id, query_keyvals: Vec<(Row, Row)>) { + let mut compiled: Program = interactive::lower::lower(stmts); + println!("{}: {} IR nodes (before optimize)", name, compiled.nodes.len()); + compiled.optimize(); + println!("{}: {} IR nodes (after optimize), result = {}", name, compiled.nodes.len(), compiled.result); + compiled.dump(); + let name = name.to_string(); + let qid = query_id; + + timely::execute_from_args(std::env::args().skip(99), move |worker| { + let (mut inputs, mut queries_handle, probe, dep_probe_in) = worker.dataflow::(|scope| { + let mut input_handles = Vec::new(); + let mut input_collections = Vec::new(); + for _ in 0..n_inputs { + let (h, c) = scope.new_collection::<(Row, Row), Diff>(); + input_handles.push(h); input_collections.push(c); + } + let (qh, qc) = scope.new_collection::<(Row, Row), Diff>(); + + let mut probe = timely::dataflow::ProbeHandle::new(); + let mut dep_probe = timely::dataflow::ProbeHandle::new(); + + let (output, dep_outer) = scope.iterative::, _, _>(|inner| { + let entered_inputs: Vec<_> = input_collections.iter().map(|c| c.clone().enter(inner)).collect(); + let entered_queries = qc.clone().enter(inner); + let (rendered, dep) = render_program(&compiled, inner.clone(), &entered_inputs, &entered_queries, qid); + let out = rendered[&compiled.result].clone().leave(scope); + let dep_out = dep.leave(scope); + (out, dep_out) + }); + + output.probe_with(&mut probe); + let dep_inspect = dep_outer.inspect(|x| eprintln!("dep: {:?}", x.clone())); + dep_inspect.probe_with(&mut dep_probe); + (input_handles, qh, probe, dep_probe) + }); + + let index = worker.index(); + let peers = worker.peers(); + + let timer = std::time::Instant::now(); + for e in 0..edges { + if (e as usize) % peers == index { + let input_idx = (e as usize) % inputs.len(); + inputs[input_idx].update(interactive::gen_row::(e, nodes, arity), 1); + } + } + for i in inputs.iter_mut() { i.advance_to(1); i.flush(); } + queries_handle.advance_to(1); queries_handle.flush(); + while probe.less_than(&1u64) || dep_probe_in.less_than(&1u64) { worker.step(); } + println!("worker {}: {} loaded ({} edges, total {:.2?})", index, name, edges, timer.elapsed()); + + if index == 0 { + for (k, v) in &query_keyvals { + queries_handle.update((k.clone(), v.clone()), 1); + } + } + for i in inputs.iter_mut() { i.advance_to(2); i.flush(); } + queries_handle.advance_to(2); queries_handle.flush(); + while probe.less_than(&2u64) || dep_probe_in.less_than(&2u64) { worker.step(); } + println!("worker {}: {} queried (total {:.2?})", index, name, timer.elapsed()); + }).unwrap(); +} + +fn main() { + let program = std::env::args().nth(1).expect("usage: ddir_expl [k0,..|v0,.. ...]"); + let arity: usize = std::env::args().nth(2).unwrap_or("2".into()).parse().unwrap(); + let nodes: u64 = std::env::args().nth(3).unwrap_or("8".into()).parse().unwrap(); + let edges: u64 = std::env::args().nth(4).unwrap_or_else(|| (2 * nodes).to_string()).parse().unwrap(); + let query_id: Id = std::env::args().nth(5).unwrap_or("14".into()).parse().unwrap(); + let query_keyvals: Vec<(Row, Row)> = std::env::args().skip(6).map(|s| { + let parts: Vec<&str> = s.split('|').collect(); + let parse_row = |p: &str| -> Row { + let mut r = Row::new(); + if !p.is_empty() { + for x in p.split(',') { r.push(x.parse().unwrap()); } + } + r + }; + let k = parse_row(parts[0]); + let v = if parts.len() > 1 { parse_row(parts[1]) } else { Row::new() }; + (k, v) + }).collect(); + + let source = interactive::load_program(&program); + let stmts = if program.ends_with(".ddp") { + parse::pipe::parse(&source) + } else { + parse::applicative::parse(&source) + }; + let n_inputs = interactive::count_inputs(&stmts); + let name = std::path::Path::new(&program).file_stem().map(|s| s.to_string_lossy().into_owned()).unwrap_or(program.clone()); + run(&name, stmts, n_inputs, nodes, edges, arity, query_id, query_keyvals); +} diff --git a/interactive/examples/reach_explain.rs b/interactive/examples/reach_explain.rs new file mode 100644 index 000000000..0d9f20140 --- /dev/null +++ b/interactive/examples/reach_explain.rs @@ -0,0 +1,222 @@ +//! Reach explanation demo. +//! +//! Loads `reach.ddir`, generates `edges` and `roots` via `gen_row` (the same +//! deterministic-hash inputs the bench uses), inspects `reach::reach` to find +//! a reachable node, then asks for an explanation. The point is that even as +//! the input grows, the explanation stays compact. +//! +//! Usage: +//! reach_explain [] + +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +#[path = "common/ddir_expl_lib.rs"] +mod expl; + +use std::sync::mpsc; +use differential_dataflow::dynamic::pointstamp::PointStamp; +use differential_dataflow::input::Input; +use timely::dataflow::ProbeHandle; + +use interactive::parse; +use interactive::ir::{Program, Diff, Id}; +use expl::{Row, render_program}; + +/// Pretty-print an explanation as labeled lines, sign + multiplicity. +/// Input 0 (edges) is `(src, dst)` — print as-is. +/// Input 1 (roots) is `(node, junk)` — gen_row produces 2-element rows but the IR +/// only uses the first field. Show just the node. +fn print_explanation(label: &str, mut rows: Vec<(Id, Row, Row, i64)>) { + rows.sort(); + println!("{}:", label); + for (id, k, v, w) in rows { + let sign = if w > 0 { "+" } else { "-" }; + match id { + 0 => { + let s = k.as_slice().get(0).copied().unwrap_or(0); + let d = k.as_slice().get(1).copied().unwrap_or(0); + println!(" {} edge ({}, {})", sign, s, d); + }, + 1 => { + let n = k.as_slice().get(0).copied().unwrap_or(0); + println!(" {} root {}", sign, n); + }, + other => { + let _ = v; + println!(" [unknown input id {}]", other); + }, + } + } +} + +fn format_row(r: &Row) -> String { + r.as_slice().iter().map(|x| x.to_string()).collect::>().join(", ") +} + +fn run(nodes_n: u64, edges_n: u64, user_query: Option) { + let source = interactive::load_program("interactive/examples/programs/reach.ddir"); + let stmts = parse::applicative::parse(&source); + let n_inputs = interactive::count_inputs(&stmts); + let mut compiled: Program = interactive::lower::lower(stmts); + compiled.optimize(); + + // We hardcode the IR ids we care about. After lowering+optimizing reach.ddir these are stable: + // id 0: Input(0) -- edges + // id 1: Linear(0, ...) + // id 2: Input(1) -- roots + // id 3: Linear(2, ...) + // id 14: Leave(5, 1) -- reach::reach lifted out of the loop + // id 17: Inspect -- result + // Query the INNER reach Variable (id 5) rather than its Leave (id 14): we want first-time + // that reflects the actual propagation iteration, not the truncated outer time. + let reach_id: Id = 5; + + timely::execute_from_args(std::iter::empty::(), move |worker| { + let (reach_tx, reach_rx) = mpsc::channel::<(Row, Row, i64)>(); + let (dep_tx, dep_rx) = mpsc::channel::<(Id, Row, Row, i64)>(); + let (mut inputs, mut queries_handle, probe, dep_probe_in) = worker.dataflow::(|scope| { + let mut input_handles = Vec::new(); + let mut input_collections = Vec::new(); + for _ in 0..n_inputs { + let (h, c) = scope.new_collection::<(Row, Row), Diff>(); + input_handles.push(h); input_collections.push(c); + } + let (qh, qc) = scope.new_collection::<(Row, Row), Diff>(); + + let mut probe = ProbeHandle::new(); + let mut dep_probe = ProbeHandle::new(); + + let (reach_out, dep_outer) = scope.iterative::, _, _>(|inner| { + let entered_inputs: Vec<_> = input_collections.iter().map(|c| c.clone().enter(inner)).collect(); + let entered_queries = qc.clone().enter(inner); + let (rendered, dep) = render_program(&compiled, inner.clone(), &entered_inputs, &entered_queries, reach_id); + let r = rendered[&reach_id].clone().leave(scope); + let d = dep.leave(scope); + (r, d) + }); + + // Send reach output to channel for sampling. + let reach_tx_c = reach_tx.clone(); + reach_out.inspect(move |((k, v), _t, w)| { let _ = reach_tx_c.send((k.clone(), v.clone(), *w as i64)); }).probe_with(&mut probe); + // Send dep output to channel, filter to Input ids only (0 = edges, 2 = roots). + // We remap input id 2 (roots in IR) to 1 for nicer printing — see print_explanation. + let dep_tx_c = dep_tx.clone(); + dep_outer.inspect(move |((id, k, v, _t_dep), _t, w)| { + let printed_id = match *id { 0 => 0, 2 => 1, _ => return }; + let _ = dep_tx_c.send((printed_id, k.clone(), v.clone(), *w as i64)); + }).probe_with(&mut dep_probe); + + (input_handles, qh, probe, dep_probe) + }); + // Drop our copies of the senders so the receivers shut down with the workers. + drop(reach_tx); drop(dep_tx); + + // Phase 1: load N edges + roots. + let timer = std::time::Instant::now(); + for e in 0..edges_n { + // Sparse roots: 1 root per 50 edges, so most nodes need edges to be reached. + let input_idx = if e % 50 == 0 { 1 } else { 0 }; + inputs[input_idx].update(interactive::gen_row::(e, nodes_n, 2), 1); + } + for i in inputs.iter_mut() { i.advance_to(1); i.flush(); } + queries_handle.advance_to(1); queries_handle.flush(); + while probe.less_than(&1u64) || dep_probe_in.less_than(&1u64) { worker.step(); } + let load_t = timer.elapsed(); + + // Drain reach output, take the FULL reachable set as deltas at time 0. + let mut reach_set: std::collections::BTreeMap<(Row, Row), i64> = Default::default(); + while let Ok((k, v, w)) = reach_rx.try_recv() { + *reach_set.entry((k, v)).or_default() += w; + } + // Drop transient queries dep state from load phase. + while dep_rx.try_recv().is_ok() {} + + let reachable: Vec = reach_set.iter().filter(|(_, &w)| w > 0) + .filter_map(|((k, _), _)| k.as_slice().first().copied().map(|x| x as u64)) + .collect(); + println!("loaded {} edges + roots ({} reachable nodes, {:.2?})", edges_n, reachable.len(), load_t); + let sample: Vec = reachable.iter().take(10).copied().collect(); + println!("sample reachable nodes: {:?}", sample); + if reachable.is_empty() { + println!("nothing reachable; try more edges/roots"); + return; + } + + // Phase 2: pick a query. + let q = user_query.filter(|n| reachable.contains(n)).unwrap_or(reachable[reachable.len() / 2]); + println!(); + println!("> explain reach(node = {})", q); + let mut qkey = Row::new(); qkey.push(q as i64); + let qkv = (qkey.clone(), Row::new()); + let q2 = std::time::Instant::now(); + queries_handle.update(qkv.clone(), 1); + for i in inputs.iter_mut() { i.advance_to(2); i.flush(); } + queries_handle.advance_to(2); queries_handle.flush(); + while probe.less_than(&2u64) || dep_probe_in.less_than(&2u64) { worker.step(); } + let qt = q2.elapsed(); + + // Cumulative explanation state: for each (id, k, v), track current weight. + let mut acc: std::collections::BTreeMap<(Id, Row, Row), i64> = Default::default(); + while let Ok((id, k, v, w)) = dep_rx.try_recv() { + *acc.entry((id, k, v)).or_default() += w; + } + let snapshot: Vec<(Id, Row, Row, i64)> = acc.iter() + .filter(|(_, &w)| w > 0) + .map(|((id, k, v), &w)| (*id, k.clone(), v.clone(), w)) + .collect(); + print_explanation("explanation", snapshot); + println!("(query elapsed: {:.2?})", qt); + + // Phase 3: age edges sequentially. Round r retracts gen_row index (r-1) and + // inserts gen_row index (edges_n + r - 1). Watch the watched node's explanation + // change when an aged edge happens to be on its path. + let rounds: u64 = std::env::var("ROUNDS").ok().and_then(|s| s.parse().ok()).unwrap_or(20); + let label = |i: usize| if i == 0 { "edge" } else { "root" }; + for r in 1..=rounds { + let time_now = 2 + r; + let retract_idx = r - 1; + let new_idx = edges_n + r - 1; + let retract_input: usize = if retract_idx % 50 == 0 { 1 } else { 0 }; + let new_input: usize = if new_idx % 50 == 0 { 1 } else { 0 }; + let retract_row = interactive::gen_row::(retract_idx, nodes_n, 2); + let new_row = interactive::gen_row::(new_idx, nodes_n, 2); + inputs[retract_input].update(retract_row.clone(), -1); + inputs[new_input].update(new_row.clone(), 1); + for i in inputs.iter_mut() { i.advance_to(time_now); i.flush(); } + queries_handle.advance_to(time_now); queries_handle.flush(); + let r_t = std::time::Instant::now(); + while probe.less_than(&time_now) || dep_probe_in.less_than(&time_now) { worker.step(); } + let rt_elapsed = r_t.elapsed(); + + let mut delta: std::collections::BTreeMap<(Id, Row, Row), i64> = Default::default(); + while let Ok((id, k, v, w)) = dep_rx.try_recv() { + *delta.entry((id, k, v)).or_default() += w; + } + for ((id, k, v), dw) in &delta { + let cur = acc.entry((*id, k.clone(), v.clone())).or_default(); + *cur += dw; + } + let delta_rows: Vec<(Id, Row, Row, i64)> = delta.into_iter() + .filter(|(_, dw)| *dw != 0) + .map(|((id, k, v), dw)| (id, k, v, dw)) + .collect(); + if !delta_rows.is_empty() { + println!(); + println!("[round {}] - {} ({}) + {} ({}) ({:.2?})", r, + label(retract_input), format_row(&retract_row.0), + label(new_input), format_row(&new_row.0), rt_elapsed); + print_explanation(" delta", delta_rows); + } + } + }).unwrap(); +} + +fn main() { + let nodes: u64 = std::env::args().nth(1).unwrap_or("100".into()).parse().unwrap(); + let edges: u64 = std::env::args().nth(2).unwrap_or_else(|| (4 * nodes).to_string()).parse().unwrap(); + let q: Option = std::env::args().nth(3).and_then(|s| s.parse().ok()); + run(nodes, edges, q); +}