Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions differential-dataflow/examples/columnar/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,16 @@ mod reachability {
let result = combined_arr.reduce_abelian::<_,
ValBuilder<Node, (), IterTime, Diff>,
ValSpine<Node, (), IterTime, Diff>,
>("Distinct", |_node, _input, output| {
output.push(((), 1));
_,
>("Distinct", |_node, _input, output| { output.push(((), 1)); },
|col, key, upds| {
use columnar::{Clear, Push};
col.keys.clear();
col.vals.clear();
col.times.clear();
col.diffs.clear();
for (val, time, diff) in upds.drain(..) { col.push((key, &val, &time, &diff)); }
*col = std::mem::take(col).consolidate();
});

// Extract RecordedUpdates from the Arranged's batch stream.
Expand Down
28 changes: 22 additions & 6 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,11 @@ pub mod vec {
use crate::trace::implementations::{ValBuilder, ValSpine};

self.arrange_by_key_named(&format!("Arrange: {}", name))
.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>>(name, logic)
.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>,_>(
name,
logic,
|vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
)
.as_collection(|k,v| (k.clone(), v.clone()))
}

Expand Down Expand Up @@ -782,7 +786,7 @@ pub mod vec {
/// ```
pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a>= &'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static,
T2: for<'a> Trace<Key<'a>= &'a K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static,
Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
{
Expand All @@ -801,12 +805,16 @@ pub mod vec {
pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
V: Clone+'static,
T2: for<'a> Trace<Key<'a>=&'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp>+'static,
T2: for<'a> Trace<Key<'a>=&'a K, ValOwn = V, Time=G::Timestamp>+'static,
Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
.reduce_core::<_,Bu,_>(name, logic)
.reduce_core::<_,Bu,_,_>(
name,
logic,
|vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
)
}
}

Expand Down Expand Up @@ -871,7 +879,11 @@ pub mod vec {
use crate::trace::implementations::{KeyBuilder, KeySpine};

self.arrange_by_self_named(&format!("Arrange: {}", name))
.reduce_abelian::<_,KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
.reduce_abelian::<_,KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>,_>(
name,
move |k,s,t| t.push(((), thresh(k, &s[0].1))),
|vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
)
.as_collection(|k,_| k.clone())
}

Expand Down Expand Up @@ -908,7 +920,11 @@ pub mod vec {
pub fn count_core<R2: Ord + Abelian + From<i8> + 'static>(self) -> Collection<G, (K, R), R2> {
use crate::trace::implementations::{ValBuilder, ValSpine};
self.arrange_by_self_named("Arrange: Count")
.reduce_abelian::<_,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
.reduce_abelian::<_,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>,_>(
"Count",
|_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))),
|vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
)
.as_collection(|k,c| (k.clone(), c.clone()))
}
}
Expand Down
6 changes: 5 additions & 1 deletion differential-dataflow/src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
}

let reader = TraceAgent {
trace: trace.clone(),

Check warning on line 94 in differential-dataflow/src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

using `.clone()` on a ref-counted pointer
queues: Rc::downgrade(&queues),
logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
Expand Down Expand Up @@ -205,7 +205,11 @@
/// // create a second dataflow
/// worker.dataflow(move |scope| {
/// trace.import(scope)
/// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Reduce", |_key, src, dst| dst.push((*src[0].0, 1)))
/// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>,_>(
/// "Reduce",
/// |_key, src, dst| dst.push((*src[0].0, 1)),
/// |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
/// )
/// .as_collection(|k,v| (k.clone(), v.clone()));
/// });
///
Expand Down Expand Up @@ -292,8 +296,8 @@
let activator = scope.activator_for(Rc::clone(&info.address));
let queue = self.new_listener(activator);

let activator = scope.activator_for(info.address);

Check warning on line 299 in differential-dataflow/src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`activator` shadows a previous, unrelated binding
*shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));

Check warning on line 300 in differential-dataflow/src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

using `.clone()` on a ref-counted pointer

capabilities.borrow_mut().as_mut().unwrap().insert(capability);

Expand Down Expand Up @@ -427,7 +431,7 @@
let activator = scope.activator_for(Rc::clone(&info.address));
let queue = self.new_listener(activator);

let activator = scope.activator_for(info.address);

Check warning on line 434 in differential-dataflow/src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`activator` shadows a previous, unrelated binding
*shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));

capabilities.borrow_mut().as_mut().unwrap().insert(capability);
Expand Down
37 changes: 20 additions & 17 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::{Data, VecCollection, AsCollection};
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
use crate::trace::implementations::merge_batcher::container::InternalMerge;

use trace::wrappers::enter::{TraceEnter, BatchEnter,};
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
Expand Down Expand Up @@ -75,7 +74,6 @@ where
use ::timely::dataflow::scopes::Child;
use ::timely::progress::timestamp::Refines;
use timely::Container;
use timely::container::PushInto;

impl<G, Tr> Arranged<G, Tr>
where
Expand Down Expand Up @@ -169,12 +167,17 @@ where
/// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
/// and this method should only be used when the data need to be transformed or exchanged, rather than
/// supplied as arguments to an operator using the same key-value structure.
pub fn as_vecs(self) -> VecCollection<G, (Tr::KeyOwn, Tr::ValOwn), Tr::Diff>
///
/// The method takes `K` and `V` as generic arguments, in order to constrain the reference types to support
/// cloning into owned types. If this bound does not work, the `as_collection` method allows arbitrary logic
/// on the reference types.
pub fn as_vecs<K, V>(self) -> VecCollection<G, (K, V), Tr::Diff>
where
Tr::KeyOwn: crate::ExchangeData,
Tr::ValOwn: crate::ExchangeData,
K: crate::ExchangeData,
V: crate::ExchangeData,
Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V>,
{
self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))])
self.flat_map_ref(move |key, val| [(key.clone(), val.clone())])
}

/// Extracts elements from an arrangement as a `VecCollection`.
Expand Down Expand Up @@ -271,43 +274,43 @@ where
T1: TraceReader + Clone + 'static,
{
/// A direct implementation of `ReduceCore::reduce_abelian`.
pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_abelian<L, Bu, T2, P>(self, name: &str, mut logic: L, push: P) -> Arranged<G, TraceAgent<T2>>
where
T1: TraceReader<KeyOwn: Ord>,
T1: TraceReader,
T2: for<'a> Trace<
Key<'a>= T1::Key<'a>,
KeyOwn=T1::KeyOwn,
ValOwn: Data,
Time=T1::Time,
Diff: Abelian,
>+'static,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Default>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
P: FnMut(&mut Bu::Input, T1::Key<'_>, &mut Vec<(T2::ValOwn, T2::Time, T2::Diff)>) + 'static,
{
self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
self.reduce_core::<_,Bu,T2,_>(name, move |key, input, output, change| {
if !input.is_empty() {
logic(key, input, change);
}
change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
crate::consolidation::consolidate(change);
})
}, push)
}

/// A direct implementation of `ReduceCore::reduce_core`.
pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_core<L, Bu, T2, P>(self, name: &str, logic: L, push: P) -> Arranged<G, TraceAgent<T2>>
where
T1: TraceReader<KeyOwn: Ord>,
T1: TraceReader,
T2: for<'a> Trace<
Key<'a>=T1::Key<'a>,
KeyOwn=T1::KeyOwn,
ValOwn: Data,
Time=T1::Time,
>+'static,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Default>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
P: FnMut(&mut Bu::Input, T1::Key<'_>, &mut Vec<(T2::ValOwn, T2::Time, T2::Diff)>) + 'static,
{
use crate::operators::reduce::reduce_trace;
reduce_trace::<_,_,Bu,_,_>(self, name, logic)
reduce_trace::<_,_,Bu,_,_,_>(self, name, logic, push)
}
}

Expand Down
24 changes: 13 additions & 11 deletions differential-dataflow/src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
//! use differential_dataflow::operators::arrange::upsert;
//!
//! let stream = scope.input_from(&mut input);
//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(stream, &"test");
//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>,String,String>(stream, &"test");
//!
//! arranged
//! .as_collection(|k,v| (k.clone(), v.clone()))
Expand Down Expand Up @@ -127,19 +127,21 @@ use super::TraceAgent;
/// This method is only implemented for totally ordered times, as we do not yet
/// understand what a "sequence" of upserts would mean for partially ordered
/// timestamps.
pub fn arrange_from_upsert<G, Bu, Tr>(
stream: Stream<G, Vec<(Tr::KeyOwn, Option<Tr::ValOwn>, G::Timestamp)>>,
pub fn arrange_from_upsert<G, Bu, Tr, K, V>(
stream: Stream<G, Vec<(K, Option<V>, G::Timestamp)>>,
name: &str,
) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp=Tr::Time>,
K: ExchangeData+Hashable+std::hash::Hash,
V: ExchangeData,
Tr: for<'a> Trace<
KeyOwn: ExchangeData+Hashable+std::hash::Hash,
ValOwn: ExchangeData,
Key<'a> = &'a K,
Val<'a> = &'a V,
Time: TotalOrder+ExchangeData,
Diff=isize,
>+'static,
Bu: Builder<Time=G::Timestamp, Input = Vec<((Tr::KeyOwn, Tr::ValOwn), Tr::Time, Tr::Diff)>, Output = Tr::Batch>,
Bu: Builder<Time=G::Timestamp, Input = Vec<((K, V), Tr::Time, Tr::Diff)>, Output = Tr::Batch>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand All @@ -148,7 +150,7 @@ where

let reader = &mut reader;

let exchange = Exchange::new(move |update: &(Tr::KeyOwn,Option<Tr::ValOwn>,G::Timestamp)| (update.0).hashed().into());
let exchange = Exchange::new(move |update: &(K,Option<V>,G::Timestamp)| (update.0).hashed().into());

let scope = stream.scope();
stream.unary_frontier(exchange, name, move |_capability, info| {
Expand All @@ -174,7 +176,7 @@ where
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());

// For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, Tr::KeyOwn, Option<Tr::ValOwn>)>>::new();
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, K, Option<V>)>>::new();
let mut updates = Vec::new();

move |(input, frontier), output| {
Expand Down Expand Up @@ -237,10 +239,10 @@ where
let mut key_con = Tr::KeyContainer::with_capacity(1);
for (key, mut list) in to_process {

key_con.clear(); key_con.push_own(&key);
key_con.clear(); key_con.push_ref(&key);

// The prior value associated with the key.
let mut prev_value: Option<Tr::ValOwn> = None;
let mut prev_value: Option<V> = None;

// Attempt to find the key in the trace.
trace_cursor.seek_key(&trace_storage, key_con.index(0));
Expand All @@ -252,7 +254,7 @@ where
assert!(count == 0 || count == 1);
if count == 1 {
assert!(prev_value.is_none());
prev_value = Some(Tr::owned_val(val));
prev_value = Some(val.clone());
}
trace_cursor.step_val(&trace_storage);
}
Expand Down
Loading
Loading