From e2c5e52c13f9d7eab6443887ad2485089fbf36d0 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 22 Mar 2026 20:46:38 -0400 Subject: [PATCH] Add Arranged::as_container convenience method --- .../src/operators/arrange/arrangement.rs | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index a894d5486..69965e65b 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -130,7 +130,29 @@ where } } - /// Flattens the stream into a `Collection`. + /// Extracts a collection of any container from the stream of batches. + /// + /// This method is like `self.stream.flat_map`, except that it produces containers + /// directly, rather than form a container of containers as `flat_map` would. + pub fn as_container(self, mut logic: L) -> crate::Collection + where + I: IntoIterator, + L: FnMut(Tr::Batch) -> I+'static, + { + self.stream.unary(Pipeline, "AsContainer", move |_,_| move |input, output| { + input.for_each(|time, data| { + let mut session = output.session(&time); + for wrapper in data.drain(..) { + for mut container in logic(wrapper) { + session.give_container(&mut container); + } + } + }); + }) + .as_collection() + } + + /// Flattens the stream into a `VecCollection`. /// /// The underlying `Stream>>` is a much more efficient way to access the data, /// and this method should only be used when the data need to be transformed or exchanged, rather than @@ -142,7 +164,7 @@ where self.flat_map_ref(move |key, val| Some(logic(key,val))) } - /// Flattens the stream into a `Collection`. + /// Flattens the stream into a `VecCollection`. /// /// The underlying `Stream>>` is a much more efficient way to access the data, /// and this method should only be used when the data need to be transformed or exchanged, rather than @@ -155,7 +177,7 @@ where self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))]) } - /// Extracts elements from an arrangement as a collection. + /// Extracts elements from an arrangement as a `VecCollection`. /// /// The supplied logic may produce an iterator over output values, allowing either /// filtering or flat mapping as part of the extraction. @@ -167,7 +189,7 @@ where Self::flat_map_batches(self.stream, logic) } - /// Extracts elements from a stream of batches as a collection. + /// Extracts elements from a stream of batches as a `VecCollection`. /// /// The supplied logic may produce an iterator over output values, allowing either /// filtering or flat mapping as part of the extraction.