diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index 2ee9919c1..ed9e1fde7 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -55,7 +55,7 @@ impl Branch for Stream { let mut output1_handle = output1.activate(); let mut output2_handle = output2.activate(); - input.activate().for_each_time(|time, data| { + input.for_each_time(|time, data| { let mut out1 = output1_handle.session(&time); let mut out2 = output2_handle.session(&time); for datum in data.flat_map(|d| d.drain(..)) { @@ -115,7 +115,7 @@ impl BranchWhen for StreamCore { let mut output1_handle = output1.activate(); let mut output2_handle = output2.activate(); - input.activate().for_each_time(|time, data| { + input.for_each_time(|time, data| { let mut out = if condition(time.time()) { output2_handle.session(&time) } else { diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index 32a0d1fd2..53746c6af 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -65,7 +65,7 @@ impl OkErr for StreamCore { let mut output1_handle = output1.activate(); let mut output2_handle = output2.activate(); - input.activate().for_each_time(|time, data| { + input.for_each_time(|time, data| { let mut out1 = output1_handle.session(&time); let mut out2 = output2_handle.session(&time); for datum in data.flat_map(|d| d.drain()) { diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 16f41b21c..74932aaf8 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -19,25 +19,6 @@ use crate::container::{CapacityContainerBuilder, PushInto}; use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; -#[must_use] -pub struct InputSession<'a, T: Timestamp, C, P: Pull>> { - input: &'a mut InputHandleCore, -} - -impl<'a, T: Timestamp, C: Accountable, P: Pull>> InputSession<'a, T, C, P> { - /// Iterates through distinct capabilities and the lists of containers associated with each. - pub fn for_each_time(self, logic: F) where F: FnMut(InputCapability, std::slice::IterMut::), C: Default { - self.input.for_each_time(logic) - } - /// Iterates through pairs of capability and container. - /// - /// The `for_each_time` method is equivalent, but groups containers by capability and is preferred, - /// in that it often leads to grouping work by capability, including the creation of output sessions. - pub fn for_each(self, logic: F) where F: FnMut(InputCapability, &mut C) { - self.input.for_each(logic) - } -} - /// Handle to an operator's input stream. pub struct InputHandleCore>> { pull_counter: PullCounter, @@ -53,15 +34,11 @@ pub struct InputHandleCore>> { } impl>> InputHandleCore { - - /// Activates an input handle with a session that reorders inputs and must be drained. - pub fn activate(&mut self) -> InputSession<'_, T, C, P> { InputSession { input: self } } - /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`. /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. /// Returns `None` when there's no more data available. #[inline] - pub fn next(&mut self) -> Option<(InputCapability, &mut C)> { + fn next(&mut self) -> Option<(InputCapability, &mut C)> { let internal = &self.internal; let summaries = &self.summaries; self.pull_counter.next_guarded().map(|(guard, bundle)| { @@ -94,10 +71,6 @@ impl>> InputHandleCore>>(input: &mut InputHandleCore) -> &mut PullCounter { - &mut input.pull_counter -} - /// Constructs an input handle. /// Declared separately so that it can be kept private when `InputHandle` is re-exported. pub fn new_input_handle>>( diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index a4ae2f129..03e9b67a1 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -4,7 +4,7 @@ use crate::progress::frontier::MutableAntichain; use crate::dataflow::channels::pact::ParallelizationContract; -use crate::dataflow::operators::generic::handles::{InputSession, OutputBuilderSession, OutputBuilder}; +use crate::dataflow::operators::generic::handles::{InputHandleCore, OutputBuilderSession, OutputBuilder}; use crate::dataflow::operators::capability::Capability; use crate::dataflow::{Scope, StreamCore}; @@ -57,7 +57,7 @@ pub trait Operator { where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain), + L: FnMut((&mut InputHandleCore, &MutableAntichain), &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P: ParallelizationContract; @@ -87,7 +87,7 @@ pub trait Operator { /// }); /// ``` fn unary_notify, + L: FnMut(&mut InputHandleCore, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator)+'static, P: ParallelizationContract> @@ -123,7 +123,7 @@ pub trait Operator { where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, + L: FnMut(&mut InputHandleCore, &mut OutputBuilderSession)+'static, P: ParallelizationContract; @@ -180,8 +180,8 @@ pub trait Operator { C2: Container, CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain), - (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain), + L: FnMut((&mut InputHandleCore, &MutableAntichain), + (&mut InputHandleCore, &MutableAntichain), &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P1: ParallelizationContract, P2: ParallelizationContract; @@ -229,8 +229,8 @@ pub trait Operator { /// ``` fn binary_notify, - InputSession<'_, G::Timestamp, C2, P2::Puller>, + L: FnMut(&mut InputHandleCore, + &mut InputHandleCore, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator)+'static, P1: ParallelizationContract, @@ -268,8 +268,8 @@ pub trait Operator { C2: Container, CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, - InputSession<'_, G::Timestamp, C2, P2::Puller>, + L: FnMut(&mut InputHandleCore, + &mut InputHandleCore, &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P1: ParallelizationContract, P2: ParallelizationContract; @@ -299,7 +299,7 @@ pub trait Operator { /// ``` fn sink(&self, pact: P, name: &str, logic: L) where - L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain))+'static, + L: FnMut((&mut InputHandleCore, &MutableAntichain))+'static, P: ParallelizationContract; } @@ -309,7 +309,7 @@ impl Operator for StreamCore { where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain), + L: FnMut((&mut InputHandleCore, &MutableAntichain), &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P: ParallelizationContract { @@ -326,7 +326,7 @@ impl Operator for StreamCore { let mut logic = constructor(capability, operator_info); move |frontiers| { let mut output_handle = output.activate(); - logic((input.activate(), &frontiers[0]), &mut output_handle); + logic((&mut input, &frontiers[0]), &mut output_handle); } }); @@ -334,7 +334,7 @@ impl Operator for StreamCore { } fn unary_notify, + L: FnMut(&mut InputHandleCore, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator)+'static, P: ParallelizationContract> @@ -358,7 +358,7 @@ impl Operator for StreamCore { where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, + L: FnMut(&mut InputHandleCore, &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P: ParallelizationContract { @@ -374,7 +374,7 @@ impl Operator for StreamCore { // `capabilities` should be a single-element vector. let capability = capabilities.pop().unwrap(); let mut logic = constructor(capability, operator_info); - move |_frontiers| logic(input.activate(), &mut output.activate()) + move |_frontiers| logic(&mut input, &mut output.activate()) }); stream @@ -385,8 +385,8 @@ impl Operator for StreamCore { C2: Container, CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain), - (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain), + L: FnMut((&mut InputHandleCore, &MutableAntichain), + (&mut InputHandleCore, &MutableAntichain), &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P1: ParallelizationContract, P2: ParallelizationContract { @@ -405,7 +405,7 @@ impl Operator for StreamCore { let mut logic = constructor(capability, operator_info); move |frontiers| { let mut output_handle = output.activate(); - logic((input1.activate(), &frontiers[0]), (input2.activate(), &frontiers[1]), &mut output_handle); + logic((&mut input1, &frontiers[0]), (&mut input2, &frontiers[1]), &mut output_handle); } }); @@ -414,8 +414,8 @@ impl Operator for StreamCore { fn binary_notify, - InputSession<'_, G::Timestamp, C2, P2::Puller>, + L: FnMut(&mut InputHandleCore, + &mut InputHandleCore, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator)+'static, P1: ParallelizationContract, @@ -443,8 +443,8 @@ impl Operator for StreamCore { C2: Container, CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, - InputSession<'_, G::Timestamp, C2, P2::Puller>, + L: FnMut(&mut InputHandleCore, + &mut InputHandleCore, &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static, P1: ParallelizationContract, P2: ParallelizationContract { @@ -464,7 +464,7 @@ impl Operator for StreamCore { let mut logic = constructor(capability, operator_info); move |_frontiers| { let mut output_handle = output.activate(); - logic(input1.activate(), input2.activate(), &mut output_handle); + logic(&mut input1, &mut input2, &mut output_handle); } }); @@ -473,7 +473,7 @@ impl Operator for StreamCore { fn sink(&self, pact: P, name: &str, mut logic: L) where - L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain))+'static, + L: FnMut((&mut InputHandleCore, &MutableAntichain))+'static, P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); @@ -481,7 +481,7 @@ impl Operator for StreamCore { builder.build(|_capabilities| { move |frontiers| { - logic((input.activate(), &frontiers[0])); + logic((&mut input, &frontiers[0])); } }); }