Skip to content

Commit e7a8b1f

Browse files
Partial progress on #757 (#758)
* Set default progress to Demand * Use robust header size method * Pre-allocate with intended length * Remove fn epoch() * Rename MutableAntichain::new_bottom to ::from_elem
1 parent 39ba5a7 commit e7a8b1f

8 files changed

Lines changed: 15 additions & 20 deletions

File tree

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
267267

268268
// Get the header and payload, ditch the header.
269269
let mut peel = bytes.extract_to(header.required_bytes());
270-
let _ = peel.extract_to(::std::mem::size_of::<MessageHeader>());
270+
let _ = peel.extract_to(header.header_bytes());
271271

272272
// Increment message count for channel.
273273
// Safe to do this even if the channel has been dropped.

timely/examples/hashjoin.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ fn main() {
9292
sent += to_send;
9393

9494
// Advance input, iterate until data cleared.
95-
let next = input1.epoch() + 1;
95+
let next = input1.time() + 1;
9696
input1.advance_to(next);
9797
input2.advance_to(next);
9898
while probe.less_than(input1.time()) {

timely/examples/unionfind.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ fn main() {
3838
for (edge, arc) in insert.take(edges / peers).enumerate() {
3939
input.send(arc);
4040
if edge % batch == (batch - 1) {
41-
let next = input.epoch() + 1;
41+
let next = input.time() + 1;
4242
input.advance_to(next);
4343
while probe.less_than(input.time()) {
4444
worker.step();

timely/src/dataflow/operators/core/input.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -469,11 +469,6 @@ impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
469469
/// and to begin to shut down operators, as this input can no longer produce data.
470470
pub fn close(self) { }
471471

472-
/// Reports the current epoch.
473-
pub fn epoch(&self) -> &T {
474-
&self.now_at
475-
}
476-
477472
/// Reports the current timestamp.
478473
pub fn time(&self) -> &T {
479474
&self.now_at

timely/src/dataflow/operators/generic/notificator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ fn notificator_delivers_notifications_in_topo_order() {
109109
use crate::order::Product;
110110
use crate::dataflow::operators::capability::Capability;
111111

112-
let mut frontier = MutableAntichain::new_bottom(Product::new(0, 0));
112+
let mut frontier = MutableAntichain::from_elem(Product::new(0, 0));
113113

114114
let root_capability = Capability::new(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new())));
115115

timely/src/progress/broadcast.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ impl<T:Timestamp+Send> Progcaster<T> {
117117
// options for improving it if performance limits users who want other logging.
118118
self.progress_logging.as_ref().map(|l| {
119119

120-
let mut messages = Vec::with_capacity(changes.len());
121-
let mut internal = Vec::with_capacity(changes.len());
120+
let mut messages = Vec::with_capacity(recv_changes.len());
121+
let mut internal = Vec::with_capacity(recv_changes.len());
122122

123123
for ((location, time), diff) in recv_changes.iter() {
124124

timely/src/progress/frontier.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,10 @@ impl<T> Antichain<T> {
219219
///
220220
/// let mut frontier = Antichain::from_elem(2);
221221
///```
222-
pub fn from_elem(element: T) -> Antichain<T> {
222+
pub fn from_elem(element: T) -> Antichain<T> {
223223
let mut elements = SmallVec::with_capacity(1);
224224
elements.push(element);
225-
Antichain { elements }
225+
Antichain { elements }
226226
}
227227

228228
/// Clears the contents of the antichain.
@@ -440,11 +440,11 @@ impl<T> MutableAntichain<T> {
440440
///```
441441
/// use timely::progress::frontier::{AntichainRef, MutableAntichain};
442442
///
443-
/// let mut frontier = MutableAntichain::new_bottom(0u64);
443+
/// let mut frontier = MutableAntichain::from_elem(0u64);
444444
/// assert!(frontier.frontier() == AntichainRef::new(&[0u64]));
445445
///```
446446
#[inline]
447-
pub fn new_bottom(bottom: T) -> MutableAntichain<T>
447+
pub fn from_elem(bottom: T) -> MutableAntichain<T>
448448
where
449449
T: Ord+Clone,
450450
{
@@ -477,7 +477,7 @@ impl<T> MutableAntichain<T> {
477477
///```
478478
/// use timely::progress::frontier::MutableAntichain;
479479
///
480-
/// let mut frontier = MutableAntichain::new_bottom(1u64);
480+
/// let mut frontier = MutableAntichain::from_elem(1u64);
481481
/// assert!(!frontier.less_than(&0));
482482
/// assert!(!frontier.less_than(&1));
483483
/// assert!(frontier.less_than(&2));
@@ -497,7 +497,7 @@ impl<T> MutableAntichain<T> {
497497
///```
498498
/// use timely::progress::frontier::MutableAntichain;
499499
///
500-
/// let mut frontier = MutableAntichain::new_bottom(1u64);
500+
/// let mut frontier = MutableAntichain::from_elem(1u64);
501501
/// assert!(!frontier.less_equal(&0));
502502
/// assert!(frontier.less_equal(&1));
503503
/// assert!(frontier.less_equal(&2));
@@ -517,7 +517,7 @@ impl<T> MutableAntichain<T> {
517517
///```
518518
/// use timely::progress::frontier::{AntichainRef, MutableAntichain};
519519
///
520-
/// let mut frontier = MutableAntichain::new_bottom(1u64);
520+
/// let mut frontier = MutableAntichain::from_elem(1u64);
521521
/// let changes =
522522
/// frontier
523523
/// .update_iter(vec![(1, -1), (2, 7)])
@@ -621,7 +621,7 @@ pub trait MutableAntichainFilter<T: PartialOrder+Ord+Clone> {
621621
/// ```
622622
/// use timely::progress::frontier::{MutableAntichain, MutableAntichainFilter};
623623
///
624-
/// let mut frontier = MutableAntichain::new_bottom(1u64);
624+
/// let mut frontier = MutableAntichain::from_elem(1u64);
625625
/// let changes =
626626
/// vec![(1, -1), (2, 7)]
627627
/// .filter_through(&mut frontier)

timely/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ impl Config {
112112
#[cfg(feature = "getopts")]
113113
pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
114114
let progress_mode = matches
115-
.opt_get_default("progress-mode", ProgressMode::Eager)?;
115+
.opt_get_default("progress-mode", ProgressMode::Demand)?;
116116
Ok(Config::default().progress_mode(progress_mode))
117117
}
118118

0 commit comments

Comments
 (0)