Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion differential-dataflow/src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ where
// .reduce(|_, s, t| t.push((s[0].0.clone(), 1)))
// })

nodes.scope().iterative::<usize,_,_>(|scope| {
use timely::order::Product;
nodes.scope().scoped::<Product<_, usize>,_,_>("Propagate", |scope| {

use crate::operators::iterate::Variable;
use crate::trace::implementations::{ValBuilder, ValSpine};
Expand Down
61 changes: 28 additions & 33 deletions differential-dataflow/src/algorithms/graphs/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,11 @@ use std::hash::Hash;
use timely::dataflow::*;

use crate::{VecCollection, ExchangeData};
use crate::operators::*;
use crate::lattice::Lattice;
use crate::difference::{Abelian, Multiply};

use super::propagate::propagate;

/// Iteratively removes nodes with no in-edges.
pub fn trim<G, N, R>(graph: VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
where
G: Scope<Timestamp: Lattice+Ord>,
N: ExchangeData + Hash,
R: ExchangeData + Abelian,
R: Multiply<R, Output=R>,
R: From<i8>,
{
graph.clone().iterate(|scope, edges| {
// keep edges from active edge destinations.
let graph = graph.enter(&scope);
let active =
edges.map(|(_src,dst)| dst)
.threshold(|_,c| if c.is_zero() { R::from(0_i8) } else { R::from(1_i8) });

graph.semijoin(active)
})
}

/// Returns the subset of edges in the same strongly connected component.
pub fn strongly_connected<G, N, R>(graph: VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
where
Expand All @@ -41,10 +20,18 @@ where
R: Multiply<R, Output=R>,
R: From<i8>
{
graph.clone().iterate(|scope, inner| {
use timely::order::Product;
graph.scope().scoped::<Product<_, usize>,_,_>("StronglyConnected", |scope| {
// Bring in edges and transposed edges.
let edges = graph.enter(&scope);
let trans = edges.clone().map_in_place(|x| mem::swap(&mut x.0, &mut x.1));
trim_edges(trim_edges(inner, edges), trans)
// Create a new variable that will be intra-scc edges.
use crate::operators::iterate::Variable;
let (variable, inner) = Variable::new_from(edges.clone(), Product::new(Default::default(), 1));

let result = trim_edges(trim_edges(inner, edges), trans);
variable.set(result.clone());
result.leave()
})
}

Expand All @@ -57,16 +44,24 @@ where
R: Multiply<R, Output=R>,
R: From<i8>
{
let nodes = edges.clone()
.map_in_place(|x| x.0 = x.1.clone())
.consolidate();
edges.inner.scope().region_named("TrimEdges", |region| {
let cycle = cycle.enter_region(region);
let edges = edges.enter_region(region);

let nodes = edges.clone()
.map_in_place(|x| x.0 = x.1.clone())
.consolidate();

// NOTE: With a node -> int function, can be improved by:
// let labels = propagate_at(&cycle, &nodes, |x| *x as u64);
let labels = propagate(cycle, nodes);
// NOTE: With a node -> int function, can be improved by:
// let labels = propagate_at(&cycle, &nodes, |x| *x as u64);
let labels = propagate(cycle, nodes).arrange_by_key();

edges.join_map(labels.clone(), |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone())))
.join_map(labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone())))
.filter(|(_,(l1,l2))| l1 == l2)
.map(|((x1,x2),_)| (x2,x1))
edges.arrange_by_key()
.join_core(labels.clone(), |e1,e2,l1| [(e2.clone(),(e1.clone(),l1.clone()))])
.arrange_by_key()
.join_core(labels, |e2,(e1,l1),l2| [((e1.clone(),e2.clone()),(l1.clone(),l2.clone()))])
.filter(|(_,(l1,l2))| l1 == l2)
.map(|((x1,x2),_)| (x2,x1))
.leave_region()
})
}
Loading