diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index 3d5d8c65c..2728de6e2 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -77,7 +77,8 @@ where // .reduce(|_, s, t| t.push((s[0].0.clone(), 1))) // }) - nodes.scope().iterative::(|scope| { + use timely::order::Product; + nodes.scope().scoped::,_,_>("Propagate", |scope| { use crate::operators::iterate::Variable; use crate::trace::implementations::{ValBuilder, ValSpine}; diff --git a/differential-dataflow/src/algorithms/graphs/scc.rs b/differential-dataflow/src/algorithms/graphs/scc.rs index f7bca525f..b730302e7 100644 --- a/differential-dataflow/src/algorithms/graphs/scc.rs +++ b/differential-dataflow/src/algorithms/graphs/scc.rs @@ -6,32 +6,11 @@ use std::hash::Hash; use timely::dataflow::*; use crate::{VecCollection, ExchangeData}; -use crate::operators::*; use crate::lattice::Lattice; use crate::difference::{Abelian, Multiply}; use super::propagate::propagate; -/// Iteratively removes nodes with no in-edges. -pub fn trim(graph: VecCollection) -> VecCollection -where - G: Scope, - N: ExchangeData + Hash, - R: ExchangeData + Abelian, - R: Multiply, - R: From, -{ - graph.clone().iterate(|scope, edges| { - // keep edges from active edge destinations. - let graph = graph.enter(&scope); - let active = - edges.map(|(_src,dst)| dst) - .threshold(|_,c| if c.is_zero() { R::from(0_i8) } else { R::from(1_i8) }); - - graph.semijoin(active) - }) -} - /// Returns the subset of edges in the same strongly connected component. pub fn strongly_connected(graph: VecCollection) -> VecCollection where @@ -41,10 +20,18 @@ where R: Multiply, R: From { - graph.clone().iterate(|scope, inner| { + use timely::order::Product; + graph.scope().scoped::,_,_>("StronglyConnected", |scope| { + // Bring in edges and transposed edges. let edges = graph.enter(&scope); let trans = edges.clone().map_in_place(|x| mem::swap(&mut x.0, &mut x.1)); - trim_edges(trim_edges(inner, edges), trans) + // Create a new variable that will be intra-scc edges. + use crate::operators::iterate::Variable; + let (variable, inner) = Variable::new_from(edges.clone(), Product::new(Default::default(), 1)); + + let result = trim_edges(trim_edges(inner, edges), trans); + variable.set(result.clone()); + result.leave() }) } @@ -57,16 +44,24 @@ where R: Multiply, R: From { - let nodes = edges.clone() - .map_in_place(|x| x.0 = x.1.clone()) - .consolidate(); + edges.inner.scope().region_named("TrimEdges", |region| { + let cycle = cycle.enter_region(region); + let edges = edges.enter_region(region); + + let nodes = edges.clone() + .map_in_place(|x| x.0 = x.1.clone()) + .consolidate(); - // NOTE: With a node -> int function, can be improved by: - // let labels = propagate_at(&cycle, &nodes, |x| *x as u64); - let labels = propagate(cycle, nodes); + // NOTE: With a node -> int function, can be improved by: + // let labels = propagate_at(&cycle, &nodes, |x| *x as u64); + let labels = propagate(cycle, nodes).arrange_by_key(); - edges.join_map(labels.clone(), |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone()))) - .join_map(labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone()))) - .filter(|(_,(l1,l2))| l1 == l2) - .map(|((x1,x2),_)| (x2,x1)) + edges.arrange_by_key() + .join_core(labels.clone(), |e1,e2,l1| [(e2.clone(),(e1.clone(),l1.clone()))]) + .arrange_by_key() + .join_core(labels, |e2,(e1,l1),l2| [((e1.clone(),e2.clone()),(l1.clone(),l2.clone()))]) + .filter(|(_,(l1,l2))| l1 == l2) + .map(|((x1,x2),_)| (x2,x1)) + .leave_region() + }) }