Skip to content

Commit e2c5e52

Browse files
committed
Add Arranged::as_container convenience method
1 parent bca29a9 commit e2c5e52

1 file changed

Lines changed: 26 additions & 4 deletions

File tree

differential-dataflow/src/operators/arrange/arrangement.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,29 @@ where
130130
}
131131
}
132132

133-
/// Flattens the stream into a `Collection`.
133+
/// Extracts a collection of any container from the stream of batches.
134+
///
135+
/// This method is like `self.stream.flat_map`, except that it produces containers
136+
/// directly, rather than form a container of containers as `flat_map` would.
137+
pub fn as_container<I, L>(self, mut logic: L) -> crate::Collection<G, I::Item>
138+
where
139+
I: IntoIterator<Item: Container>,
140+
L: FnMut(Tr::Batch) -> I+'static,
141+
{
142+
self.stream.unary(Pipeline, "AsContainer", move |_,_| move |input, output| {
143+
input.for_each(|time, data| {
144+
let mut session = output.session(&time);
145+
for wrapper in data.drain(..) {
146+
for mut container in logic(wrapper) {
147+
session.give_container(&mut container);
148+
}
149+
}
150+
});
151+
})
152+
.as_collection()
153+
}
154+
155+
/// Flattens the stream into a `VecCollection`.
134156
///
135157
/// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
136158
/// and this method should only be used when the data need to be transformed or exchanged, rather than
@@ -142,7 +164,7 @@ where
142164
self.flat_map_ref(move |key, val| Some(logic(key,val)))
143165
}
144166

145-
/// Flattens the stream into a `Collection`.
167+
/// Flattens the stream into a `VecCollection`.
146168
///
147169
/// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
148170
/// and this method should only be used when the data need to be transformed or exchanged, rather than
@@ -155,7 +177,7 @@ where
155177
self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))])
156178
}
157179

158-
/// Extracts elements from an arrangement as a collection.
180+
/// Extracts elements from an arrangement as a `VecCollection`.
159181
///
160182
/// The supplied logic may produce an iterator over output values, allowing either
161183
/// filtering or flat mapping as part of the extraction.
@@ -167,7 +189,7 @@ where
167189
Self::flat_map_batches(self.stream, logic)
168190
}
169191

170-
/// Extracts elements from a stream of batches as a collection.
192+
/// Extracts elements from a stream of batches as a `VecCollection`.
171193
///
172194
/// The supplied logic may produce an iterator over output values, allowing either
173195
/// filtering or flat mapping as part of the extraction.

0 commit comments

Comments
 (0)