Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,29 @@ where
}
}

/// Flattens the stream into a `Collection`.
/// Extracts a collection of any container from the stream of batches.
///
/// This method is like `self.stream.flat_map`, except that it produces containers
/// directly, rather than form a container of containers as `flat_map` would.
pub fn as_container<I, L>(self, mut logic: L) -> crate::Collection<G, I::Item>
where
I: IntoIterator<Item: Container>,
L: FnMut(Tr::Batch) -> I+'static,
{
self.stream.unary(Pipeline, "AsContainer", move |_,_| move |input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for wrapper in data.drain(..) {
for mut container in logic(wrapper) {
session.give_container(&mut container);
}
}
});
})
.as_collection()
}

/// Flattens the stream into a `VecCollection`.
///
/// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
/// and this method should only be used when the data need to be transformed or exchanged, rather than
Expand All @@ -142,7 +164,7 @@ where
self.flat_map_ref(move |key, val| Some(logic(key,val)))
}

/// Flattens the stream into a `Collection`.
/// Flattens the stream into a `VecCollection`.
///
/// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
/// and this method should only be used when the data need to be transformed or exchanged, rather than
Expand All @@ -155,7 +177,7 @@ where
self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))])
}

/// Extracts elements from an arrangement as a collection.
/// Extracts elements from an arrangement as a `VecCollection`.
///
/// The supplied logic may produce an iterator over output values, allowing either
/// filtering or flat mapping as part of the extraction.
Expand All @@ -167,7 +189,7 @@ where
Self::flat_map_batches(self.stream, logic)
}

/// Extracts elements from a stream of batches as a collection.
/// Extracts elements from a stream of batches as a `VecCollection`.
///
/// The supplied logic may produce an iterator over output values, allowing either
/// filtering or flat mapping as part of the extraction.
Expand Down
Loading