Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
934a0f9
Merge pull request #700 from TimelyDataflow/master
frankmcsherry Mar 25, 2026
eba4fd9
Merge branch 'master' into master-next
frankmcsherry Mar 26, 2026
3d595ed
Slow but correct impls
frankmcsherry Mar 25, 2026
b25fd43
WIP
frankmcsherry Mar 26, 2026
a7e92f6
Wildly overcomplicated
frankmcsherry Mar 29, 2026
d50868e
Merge pull request #704 from frankmcsherry/columnar_work
frankmcsherry Mar 29, 2026
841ddb9
Merge branch 'master' into master-next
frankmcsherry Apr 2, 2026
ee08017
Remove commented code
frankmcsherry Apr 4, 2026
1535c4b
Removed one-off trait
frankmcsherry Apr 4, 2026
e6af458
Remove unread counters
frankmcsherry Apr 4, 2026
8f9e929
Convert silent errors to panics
frankmcsherry Apr 4, 2026
51aef09
Simplify logic
frankmcsherry Apr 4, 2026
3901fdb
More commented code removed
frankmcsherry Apr 4, 2026
b157359
Extract unconditional behavior
frankmcsherry Apr 4, 2026
fadcd7b
Idiomatic Rust
frankmcsherry Apr 4, 2026
2ec3293
Less time cloning
frankmcsherry Apr 4, 2026
8153387
Idiomatic capability use
frankmcsherry Apr 4, 2026
d021540
Ref keyword to borrow
frankmcsherry Apr 4, 2026
7a02834
Further tightening
frankmcsherry Apr 4, 2026
8e75167
Tidy explanatory text, and reflect frontier
frankmcsherry Apr 4, 2026
7c053ad
Tighten comments, remove mutable borrow
frankmcsherry Apr 4, 2026
5281266
Prefer insert_ref to insert
frankmcsherry Apr 4, 2026
33adafb
Merge pull request #709 from frankmcsherry/reduce_tidy
frankmcsherry Apr 4, 2026
671de13
Use containers for interesting (keys, time) (#710)
frankmcsherry Apr 4, 2026
ec8a15d
Merge branch 'master' into master-next
frankmcsherry Apr 7, 2026
1f604d0
Remove TimelyStack and all dependent types (#715)
antiguru Apr 9, 2026
954c2ea
Track timely `master` changes, around `Child` scope changes (#714)
frankmcsherry Apr 9, 2026
4c66ade
Relifetimed scopes (#718)
frankmcsherry Apr 11, 2026
6f4ae4c
Interactive update (#719)
frankmcsherry Apr 11, 2026
6f2f19e
DDIR README
frankmcsherry Apr 11, 2026
56dc613
Continue to track timely master (#720)
frankmcsherry Apr 13, 2026
ddfd05e
Target timely 0.29 (#721)
frankmcsherry Apr 13, 2026
523a501
Separate chunker from batcher
antiguru Apr 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ rust-version = "1.86"

[workspace.dependencies]
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.22.0" }
timely = { version = "0.28", default-features = false }
timely = { version = "0.29", default-features = false }
columnar = { version = "0.12", default-features = false }
#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[workspace.lints.clippy]
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Alternately, here is a fragment that computes the set of nodes reachable from a
```rust
let reachable =
roots.iterate(|scope, reach|
edges.enter(&scope)
edges.enter(scope)
.semijoin(reach)
.map(|(src, dst)| dst)
.concat(reach)
Expand Down Expand Up @@ -337,7 +337,7 @@ edges.iterate(|scope, inner| {
.map(|(node,_)| node);

// keep edges between active vertices
edges.enter(&scope)
edges.enter(scope)
.semijoin(active)
.map(|(src,dst)| (dst,src))
.semijoin(active)
Expand Down
2 changes: 1 addition & 1 deletion advent_of_code_2017/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ workspace = true

[dependencies]
differential-dataflow = { workspace = true }
timely = { git = "https://github.com/frankmcsherry/timely-dataflow" }
timely = { workspace = true }
4 changes: 2 additions & 2 deletions advent_of_code_2017/src/bin/day_06.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {

let stable = banks.iterate(|scope, iter|
iter.map_in_place(|banks| recycle(banks))
.concat(banks.enter(&scope))
.concat(banks.enter(scope))
.distinct()
);

Expand All @@ -43,7 +43,7 @@ fn main() {
loop_point
.iterate(|scope, iter|
iter.map_in_place(|banks| recycle(banks))
.concat(loop_point.enter(&scope))
.concat(loop_point.enter(scope))
.distinct()
)
.map(|_| ((),()))
Expand Down
4 changes: 2 additions & 2 deletions advent_of_code_2017/src/bin/day_07.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,10 +1104,10 @@ tvhftq (35)";

let total_weights: VecCollection<_,String> = weights
.iterate(|scope, inner| {
parents.enter(&scope)
parents.enter(scope)
.semijoin(inner)
.map(|(_, parent)| parent)
.concat(weights.enter(&scope))
.concat(weights.enter(scope))
});

parents
Expand Down
2 changes: 1 addition & 1 deletion advent_of_code_2017/src/bin/day_08.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ wui inc -120 if i > -2038";
.map(|_| ((0, String::new()), 0))
.iterate(|scope, valid| {

let edits = edits.enter(&scope);
let edits = edits.enter(scope);

valid
.prefix_sum_at(edits.map(|(key,_)| key), 0, |_k,x,y| *x + *y)
Expand Down
6 changes: 3 additions & 3 deletions advent_of_code_2017/src/bin/day_09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ where
if input.len() > 1 { result = combine(result, &(input[1].0).1); }
output.push((result, 1));
})
.concat(unit_ranges.enter(&scope))
.concat(unit_ranges.enter(scope))
)
}

Expand Down Expand Up @@ -154,10 +154,10 @@ where
.iterate(|scope, state| {
aggregates
.filter(|&((_, log),_)| log < 64) // the log = 64 interval doesn't help us here (overflows).
.enter(&scope)
.enter(scope)
.map(|((pos, log), data)| (pos, (log, data)))
.join_map(state, move |&pos, &(log, ref data), state| (pos + (1 << log), combine(state, data)))
.concat(init_state.enter(&scope))
.concat(init_state.enter(scope))
.distinct()
})
.consolidate()
Expand Down
4 changes: 2 additions & 2 deletions advent_of_code_2017/src/bin/day_12.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2035,8 +2035,8 @@ fn main() {
let labels =
nodes
.iterate(|scope, label| {
let edges = edges.enter(&scope);
let nodes = nodes.enter(&scope);
let edges = edges.enter(scope);
let nodes = nodes.enter(scope);
label
.join_map(edges, |_src, &lbl, &tgt| (tgt, lbl))
.concat(nodes)
Expand Down
1 change: 1 addition & 0 deletions diagnostics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[package]
name = "diagnostics"
version = "0.1.0"
publish = false
edition.workspace = true
rust-version.workspace = true

Expand Down
60 changes: 29 additions & 31 deletions diagnostics/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::trace::implementations::{KeySpine, ValSpine};
use differential_dataflow::{AsCollection, VecCollection};

use timely::communication::Allocate;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::capture::{Event, EventLink, Replay, Capture};
Expand Down Expand Up @@ -244,12 +243,11 @@ fn quantize(time: Duration, interval: Duration) -> Duration {
}

/// Quantize timestamps in a collection's inner stream.
fn quantize_collection<S, D>(
collection: VecCollection<S, D, i64>,
fn quantize_collection<'scope, D>(
collection: VecCollection<'scope, Duration, D, i64>,
interval: Duration,
) -> VecCollection<S, D, i64>
) -> VecCollection<'scope, Duration, D, i64>
where
S: Scope<Timestamp = Duration>,
D: differential_dataflow::Data,
{
collection
Expand All @@ -276,7 +274,7 @@ where
///
/// Returns a [`LoggingState`] with trace handles and a [`SinkHandle`] for
/// the WebSocket thread.
pub fn register<A: Allocate>(worker: &mut Worker<A>, log_logging: bool) -> LoggingState {
pub fn register(worker: &mut Worker, log_logging: bool) -> LoggingState {
let start = Instant::now();

// Event links for logging capture (worker-internal, Rc-based).
Expand Down Expand Up @@ -377,8 +375,8 @@ pub fn register<A: Allocate>(worker: &mut Worker<A>, log_logging: bool) -> Loggi
}
}

fn install_loggers<A: Allocate>(
worker: &mut Worker<A>,
fn install_loggers(
worker: &mut Worker,
t_link: Rc<EventLink<Duration, Vec<(Duration, TimelyEvent)>>>,
d_link: Rc<EventLink<Duration, Vec<(Duration, DifferentialEvent)>>>,
) {
Expand All @@ -402,11 +400,11 @@ fn install_loggers<A: Allocate>(
// ============================================================================

/// Internal: collections before arrangement, used for the cross-join.
struct TimelyCollections<S: Scope> {
operators: VecCollection<S, (usize, String, Vec<usize>), i64>,
channels: VecCollection<S, (usize, Vec<usize>, (usize, usize), (usize, usize)), i64>,
elapsed: VecCollection<S, usize, i64>,
messages: VecCollection<S, usize, i64>,
struct TimelyCollections<'scope> {
operators: VecCollection<'scope, Duration, (usize, String, Vec<usize>), i64>,
channels: VecCollection<'scope, Duration, (usize, Vec<usize>, (usize, usize), (usize, usize)), i64>,
elapsed: VecCollection<'scope, Duration, usize, i64>,
messages: VecCollection<'scope, Duration, usize, i64>,
}

#[derive(Default)]
Expand All @@ -416,16 +414,16 @@ struct TimelyDemuxState {
}

/// Build timely logging collections and arrangements.
fn construct_timely<S: Scope<Timestamp = Duration>>(
scope: &mut S,
stream: Stream<S, Vec<(Duration, TimelyEvent)>>,
) -> (TimelyTraces, TimelyCollections<S>) {
fn construct_timely<'scope>(
scope: Scope<'scope, Duration>,
stream: Stream<'scope, Duration, Vec<(Duration, TimelyEvent)>>,
) -> (TimelyTraces, TimelyCollections<'scope>) {
type OpUpdate = ((usize, String, Vec<usize>), Duration, i64);
type ChUpdate = ((usize, Vec<usize>, (usize, usize), (usize, usize)), Duration, i64);
type ElUpdate = (usize, Duration, i64);
type MsgUpdate = (usize, Duration, i64);

let mut demux = OperatorBuilder::new("Timely Demux".to_string(), scope.clone());
let mut demux = OperatorBuilder::new("Timely Demux".to_string(), scope);
let mut input = demux.new_input(stream, Pipeline);

let (op_out, operates) = demux.new_output::<Vec<OpUpdate>>();
Expand Down Expand Up @@ -536,24 +534,24 @@ fn construct_timely<S: Scope<Timestamp = Duration>>(
// ============================================================================

/// Internal: collections before arrangement, used for the cross-join.
struct DifferentialCollections<S: Scope> {
arrangement_batches: VecCollection<S, usize, i64>,
arrangement_records: VecCollection<S, usize, i64>,
sharing: VecCollection<S, usize, i64>,
batcher_records: VecCollection<S, usize, i64>,
batcher_size: VecCollection<S, usize, i64>,
batcher_capacity: VecCollection<S, usize, i64>,
batcher_allocations: VecCollection<S, usize, i64>,
struct DifferentialCollections<'scope> {
arrangement_batches: VecCollection<'scope, Duration, usize, i64>,
arrangement_records: VecCollection<'scope, Duration, usize, i64>,
sharing: VecCollection<'scope, Duration, usize, i64>,
batcher_records: VecCollection<'scope, Duration, usize, i64>,
batcher_size: VecCollection<'scope, Duration, usize, i64>,
batcher_capacity: VecCollection<'scope, Duration, usize, i64>,
batcher_allocations: VecCollection<'scope, Duration, usize, i64>,
}

/// Build differential logging collections and arrangements.
fn construct_differential<S: Scope<Timestamp = Duration>>(
scope: &mut S,
stream: Stream<S, Vec<(Duration, DifferentialEvent)>>,
) -> (DifferentialTraces, DifferentialCollections<S>) {
fn construct_differential<'scope>(
scope: Scope<'scope, Duration>,
stream: Stream<'scope, Duration, Vec<(Duration, DifferentialEvent)>>,
) -> (DifferentialTraces, DifferentialCollections<'scope>) {
type Update = (usize, Duration, i64);

let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope.clone());
let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope);
let mut input = demux.new_input(stream, Pipeline);

let (bat_out, batches) = demux.new_output::<Vec<Update>>();
Expand Down
9 changes: 4 additions & 5 deletions differential-dataflow/examples/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ fn main() {
let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let batch: usize = 10_000;

// This computation demonstrates in-place accumulation of arbitrarily large
// This computation demonstrates in-place accumulation of arbitrarily large
// volumes of input data, consuming space bounded by the number of distinct keys.
timely::execute_from_args(std::env::args().skip(2), move |worker| {

Expand All @@ -17,11 +17,10 @@ fn main() {
let mut input = worker.dataflow::<(), _, _>(|scope| {
let (input, data) = scope.new_collection::<_, isize>();

use timely::dataflow::Scope;
scope.iterative::<u32,_,_>(|inner| {
data.enter_at(inner, |_| 0)
.consolidate()
.leave()
.leave(scope)
});

input
Expand All @@ -41,7 +40,7 @@ fn main() {
}
counter += batch;

worker.step();
worker.step();
let elapsed = timer.elapsed();

if elapsed.as_secs() as usize > last_sec {
Expand All @@ -54,4 +53,4 @@ fn main() {
}

}).unwrap();
}
}
7 changes: 3 additions & 4 deletions differential-dataflow/examples/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::operators::*;
use timely::order::Product;
use timely::scheduling::Scheduler;

use differential_dataflow::input::Input;
use differential_dataflow::AsCollection;
Expand Down Expand Up @@ -109,8 +108,8 @@ fn main() {
// repeatedly update minimal distances each node can be reached from each root
roots.clone().iterate(|scope, dists| {

let edges = edges.enter(&scope);
let roots = roots.enter(&scope);
let edges = edges.enter(scope);
let roots = roots.enter(scope);

dists.arrange_by_key()
.join_core(edges, |_k,l,d| Some((*d, l+1)))
Expand Down Expand Up @@ -175,4 +174,4 @@ fn main() {
}
}
}).unwrap();
}
}
9 changes: 4 additions & 5 deletions differential-dataflow/examples/bfs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::*;
use timely::dataflow::operators::probe::Handle;

use differential_dataflow::input::Input;
Expand Down Expand Up @@ -91,18 +90,18 @@ fn main() {
}

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<G>(edges: VecCollection<G, Edge>, roots: VecCollection<G, Node>) -> VecCollection<G, (Node, u32)>
fn bfs<'scope, T>(edges: VecCollection<'scope, T, Edge>, roots: VecCollection<'scope, T, Node>) -> VecCollection<'scope, T, (Node, u32)>
where
G: Scope<Timestamp: Lattice+Ord>,
T: timely::progress::Timestamp + Lattice + Ord,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));

// repeatedly update minimal distances each node can be reached from each root
nodes.clone().iterate(|scope, inner| {

let nodes = nodes.enter(&scope);
let edges = edges.enter(&scope);
let nodes = nodes.enter(scope);
let edges = edges.enter(scope);

inner.join_map(edges, |_k,l,d| (*d, l+1))
.concat(nodes)
Expand Down
Loading