timely-util: columnar-native merge-batcher (chunker + merger + bench)#36408
timely-util: columnar-native merge-batcher (chunker + merger + bench)#36408DAlperin wants to merge 15 commits intoMaterializeInc:mainfrom
Conversation
antiguru
left a comment
There was a problem hiding this comment.
Very cool! I left some comments inline, but overall this points in the right direction.
| // inlines through the leaf chain; tracking a local counter | ||
| // instead measured slower, presumably because the extra | ||
| // updates added register pressure to the hot loop. | ||
| let target = timely::container::buffer::default_capacity::<(D, T, R)>(); |
There was a problem hiding this comment.
We shouldn't be using the default capacity here (or ever, really). Instead, I'd copy the heuristic the column builder encodes: If we're within 10% of 2MiB (or a configurable constant, const), we ship the batch. Question is whether we should serialize or not, but I lean toward "yes".
There was a problem hiding this comment.
Do you mean serialize on the boundaries? It's trivial enough to serialize in extract but it will just flow directly into a trace builder which will probably have to decode it to bytes anyway right away
| let (d1, t1, r1) = left_borrow.get(left_pos[0]); | ||
| let (d2, t2, r2) = right_borrow.get(right_pos[0]); |
There was a problem hiding this comment.
What helped in the consolidating column builder: Use the fact that (D, T, R)::Container is (D::Container, T::Container, R::Container), which allows us to construct the result using three separate containers, as long as the length matches. Especially with galloping, writing runs of values to a single container enables vectorization.
| tuple_leaf_reserve!(A a, B b, C c); | ||
| tuple_leaf_reserve!(A a, B b, C c, D d); | ||
|
|
||
| impl<C: Columnar> SizableContainer for Column<C> |
There was a problem hiding this comment.
If at all possible, we shouldn't implement SizableContainer for Column. It's legacy, and each use is a smell.
There was a problem hiding this comment.
TimelyDataflow/differential-dataflow#738 would allow us to remove the requirement.
| /// fresh chunk's leaf Vecs filling up. Without it, each chunk's leaves grow | ||
| /// from zero by doubling, and the cumulative realloc traffic shows up as | ||
| /// allocator memmove cost on the hot path. | ||
| pub trait LeafReserve { |
There was a problem hiding this comment.
I think if we switch from the default capacity to serialized capacity, we don't need this helper trait anymore. We'd build into one owned copy, and to ship it we'd serialize into a new one and clear. After the first round, we should have appropriate capacity, so we don't have to worry about pre-sizing.
| } | ||
|
|
||
| fn bench_merge(c: &mut Criterion) { | ||
| let n = 10_000usize; |
There was a problem hiding this comment.
It might help to define the n not in terms of elements but size, and then scale for specific heap sizes.
d7d6bb5 to
8687d94
Compare
def-
left a comment
There was a problem hiding this comment.
Column<(D,T,R)>::extract (src/timely-util/src/columnar/batcher.rs:548) violates the InternalMerge::extract contract: it never yields mid-buffer, so the framework can't flush keep/ship until the input is fully drained. Other impls (Vec, ColumnationStack) gate their inner loop on !keep.at_capacity() && !ship.at_capacity().
Output chunks overshoot the ship threshold by up to one full input buffer. Not sure how bad that is, but could waste a bunch of memory?
Small test:
diff --git a/src/timely-util/src/columnar/batcher.rs b/src/timely-util/src/columnar/batcher.rs
index 600b14c4c3..f39b42f846 100644
--- a/src/timely-util/src/columnar/batcher.rs
+++ b/src/timely-util/src/columnar/batcher.rs
@@ -926,4 +926,43 @@ mod proptests {
prop_assert!(frontier.elements().is_empty());
}
}
+
+ proptest! {
+ #![proptest_config(ProptestConfig { cases: 16, ..ProptestConfig::default() })]
+
+ #[mz_ore::test]
+ #[cfg_attr(miri, ignore)]
+ fn extract_yields_when_keep_at_capacity(upper_time in 0u64..=4) {
+ use timely::container::SizableContainer;
+
+ let mut keep: Column<Tuple> = Default::default();
+ let mut k = 0u64;
+ while !keep.at_capacity() {
+ keep.push_into(((k, 0u64), upper_time, 1i64));
+ k += 1;
+ }
+
+ let mut self_col: Column<Tuple> = Default::default();
+ for i in 0..32u64 {
+ self_col.push_into(((i, 1u64), upper_time, 1i64));
+ }
+ let len = self_col.borrow().len();
+
+ let upper = Antichain::from_elem(upper_time);
+ let mut frontier: Antichain<u64> = Antichain::new();
+ let mut ship: Column<Tuple> = Default::default();
+ let mut position = 0;
+
+ InternalMerge::extract(
+ &mut self_col,
+ &mut position,
+ upper.borrow(),
+ &mut frontier,
+ &mut keep,
+ &mut ship,
+ );
+
+ prop_assert!(position < len, "position={}, len={}", position, len);
+ }
+ }
}|
@def- yep good point, will fix tomorrow |
| // and ships it regardless. | ||
| let at_ship_threshold = | ||
| |sd: &D::Container, st: &T::Container, sr: &R::Container| { | ||
| use columnar::AsBytes as _; |
There was a problem hiding this comment.
we already have a function for this
| pub type ColumnMerger<D, T, R> = InternalMerger<Column<(D, T, R)>>; | ||
|
|
||
| /// `InternalMerge` for [`Column`]-shaped sorted chunks. | ||
| impl<D, T, R> InternalMerge for Column<(D, T, R)> |
There was a problem hiding this comment.
Apparently InternalMerge is supposed to go away. Let's implement Merger directly?
8f4d1ff to
592b765
Compare
592b765 to
7c85d3c
Compare
Replace the columnation detour in the merge-batcher path with all-Column plumbing. This commit adds the input-side chunker and the InternalMerge impl; the seal builder and rewiring Col2ValBatcher are follow-ups, so the legacy Chunker / ColInternalMerger path remains active. * ColumnChunker<U> — same sort + consolidate loop as Chunker<C> but output stays in Column<U>, no ColumnationStack round-trip. Reuses owned scratch slots across pushes. * SizableContainer for Column<C> — at_capacity left unimplemented!() (capacity policy deferred); ensure_capacity is a debug_assert! since merger-internal chunks are always Column::Typed. * InternalMerge for Column<(D, T, R)> — 0/1/2-input merge with the empty-self bulk-swap shortcut and Container::extend_from_self for the Less/Greater arms. Equal-key branch consolidates diffs in a reusable R accumulator and drops zero sums. extract partitions by frontier in a single pass (no mid-call yielding, matching DD's columnar-trie path). * ColumnMerger<D, T, R> type alias for InternalMerger<Column<(D, T, R)>>. * Proptests covering merge_from and extract against a brute-force reference; verified by injecting a missing-frontier-insert mutation. Naive merge for now; galloping the Less/Greater runs is a follow-up.
Builds on the parent commit (gallop view-projection + length-1 push) with two more optimizations and a microbenchmark for end-to-end measurement. * **Pre-allocation via `LeafReserve` + mid-merge yield** — adds a `LeafReserve` trait (impl'd for `Vec<T>` and tuples) used by `Column::ensure_capacity` to pre-reserve `default_capacity` records per leaf on each fresh chunk, avoiding the cumulative geometric Vec-grow cost. Adds a matching `borrow().len() < target` check in `merge_from` so the loop yields back to the merge driver at the chunk boundary instead of running to one input's exhaustion and overshooting the reservation. * **Gallop bypass for length-1 runs** — the common case on interleaved data is a 1-record advance. Push directly + advance, then peek at the next record; only enter `gallop` (and the bulk-copy branch) when an actual run is detected. Skips per-iteration `gallop` setup + the first cmp probe for runs of length 1. Microbench in `benches/columnar_merger.rs` drives a 2-input merge through `InternalMerger::merge` over three regimes — mixed (interleaved keys), collisions (small key range, equal-key consolidation), disjoint (non-overlapping ranges, long runs). Wall-time on `((u64, u64), u64, i64)` data with n=10K per side, cumulative across this commit and its parent: | config | columnation | column initial | column final | improvement | |------------|-------------|----------------|--------------|-------------| | mixed | 27.9 µs | 204 µs | 138.9 µs | -32% | | collisions | 28.0 µs | 220 µs | 147.0 µs | -33% | | disjoint | 14.8 µs | 6.3 µs | 6.5 µs | ~unchanged| Column is now ~2.2x faster than columnation on disjoint (long-run-heavy) data where galloping bulk-copy wins, and ~5x slower on dense primitive-keyed mixed/collisions data — that residual gap is the parallel-array layout cost (4 leaf Vecs, 4x per-leaf trait dispatch per record) vs ColumnationStack's single flat slice. Closing it further needs either a different wire shape (trie) or variable-length keys (Row), where per-record overhead amortizes. Adds a TODO in the bench for a Row-keyed variant; gated on `Rows` (the columnar container for `Row`) being reachable from a dev-dep, currently private to a module in `mz-repr`.
…eaf gallop writes
- Drop the `copy_from(&mut owned_*) → push(&owned_*)` round-trip for D and T in the `Equal` branch; refs from the input borrow are valid through the merge step, so push them via each leaf's `Push<Ref<_>>` impl directly. The original shape triggered `Row::copy_from = CompactBytes::clear() + extend_from_slice` per equal-key record, which freed and re-mallocated every row that overflowed the inline budget. On Row workloads this was the largest hot spot in the profile. - Pre-size each output leaf via `Container::reserve_for([left_borrow, right_borrow])` so push paths don't pay geometric `grow_amortized + realloc`. Skipped past a record-count threshold to avoid over-reserving on huge inputs where consolidation makes the input bound a 4× over-estimate. - Amortize the in-loop `at_ship_threshold` check: bumped from per-iteration to every 1024 iterations via a power-of-two mask. The closure walks 4–5 leaf slices per call, which is non-trivial on variable-length leaves like `Rows`; overshooting the ship boundary by up to a thousand records before noticing has no practical impact — the outer driver's `at_capacity` ships the chunk regardless.
…er InternalMerge contract - The chunker's `copy_from(&mut owned_*) → push(&owned_*)` round-trip for D and T triggered `Row::copy_from = CompactBytes::clear() + extend_from_slice` per output record, freeing and re-mallocating every row that overflowed the inline budget. Refs from the input borrow are valid through the sweep, so push them via each leaf's `Push<Ref<_>>` impl directly. Same fix as `merge_from`'s Equal branch from the previous commit. R still uses an owned scratch since it carries the consolidated sum. - `extract` now yields when either output buffer reaches the ship threshold. Required by `InternalMerge::extract`'s contract: the framework only checks `at_capacity` between calls, so without an inner-loop yield a single call could fill an output buffer well past threshold.
The InternalMerger wrapper is being removed upstream. Replace the `pub type ColumnMerger<D, T, R> = InternalMerger<Column<(D, T, R)>>` type alias with a phantom-marker struct that implements diff-dataflow's `Merger` trait directly. The per-chunk merge_from / extract logic stays in this module, moved from the dropped `InternalMerge` impl onto inherent methods on `Column<(D, T, R)>`. `ColumnMerger::merge` and `::extract` orchestrate those over chains of chunks (drain pairs in lockstep, ship full results, recycle inputs via stash) — a specialized port of the previous generic `InternalMerger` orchestration. Proptests now call the inherent methods directly. No behaviour change: same merge/consolidate/extract semantics, same `SizableContainer::at_capacity` ship signal, same gallop / pre-reserve / amortized threshold-check optimizations.
Two optimizations in the merge driver, unlocked by owning Merger directly: 1. Whole-chunk passthrough in `Merger::merge`. When one head's tail (from its current position) is sortable-before the other head's current record, the entire tail is appended to `output` via `mem::replace` — no per-record compares, no per-leaf `extend_from_self` byte copies. Two probes per outer iteration gate the fast path; the `<` boundary preserves the strict-sort invariant across output chunks (so equal-key consolidation still runs through `merge_from` as before). 2. `Column::merge_from` returns `bool`. `true` signals the inner amortized ship-threshold check fired; the outer loop ships without redoing the `at_capacity` walk on the result's leaf bytes. `false` (input-exhausted path) still falls through to the existing `at_capacity` check to catch overshoot accumulation across short calls. Three new tests in addition to the existing 15 (10 hand-written + 5 proptests), driving `Merger::merge` directly: - `merger_disjoint_chains_passthrough` — disjoint ranges, every chunk hits the fast path. - `merger_interleaved_chains` — alternating keys, fast path never fires, exercises the bool-return path. - `merger_passthrough_respects_equal_boundary` — chain1.last == chain2.first; passthrough must NOT fire (would split equal keys across output chunks unconsolidated). Perf — full `MergeBatcher` Row push + seal, `mz-compute/benches/columnar_merge_batcher_row.rs`. Column path percent-change vs the `Merger`-direct baseline (criterion `--baseline before`): mixed/8M: +20% throughput (-16.5% time) collisions/8M: +6% (-5.7%) disjoint/8M: +7% (-6.8%) mixed/512K: +1% (within noise) collisions/512K: +7% (-6.7%) disjoint/512K: +3% (-2.4%) 32K sizes within noise — passthrough barely fires when the chain is one or two chunks. 8M is where the wins concentrate: chains have ~125 chunks each, so the passthrough fast path applies many times per merge, and the cumulative byte-copy savings show through end-to-end.
Clears `clippy::as_conversions` across the columnar merger benches
and one proptest. Conversions split between three idioms:
- `CastFrom` for widening integer casts (`u64::cast_from(usize)`,
`usize::cast_from(u32)`).
- `CastLossy` for f64-from-integer (the throughput formatters
that convert byte counts to GiB/s).
- `ReinterpretCast` for u64↔i64, used by Row keys round-tripped
through `Datum::Int64`.
Also drops a needless `Some(...?)` wrapper on the bench's
`config_bytes` helper.
Adds a `sorted` regime to the e2e MergeBatcher Row bench, modeling the persist-hydration shape: one globally sorted+consolidated dataset of `n_total` records, sliced into contiguous rounds whose key ranges are non-overlapping with their neighbors. This is what compute sees when reading a sorted snapshot from persist — each on-the-wire batch is a tile of the same global sort order — and it's the regime where the column path's whole-chunk passthrough fast path fires on every chain compaction. Head-to-head e2e MergeBatcher results — Row, push 64 rounds + final seal, throughput in payload bytes: ┌────────────┬───────────────────────┬───────────────────────┬───────────────────────┐ │ Config │ columnation │ column │ column vs columnation │ ├────────────┼───────────────────────┼───────────────────────┼───────────────────────┤ │ sorted/32K │ 2.32 GiB/s (78.8 µs) │ 2.54 GiB/s (72.1 µs) │ 1.09× faster │ │ sorted/512K│ 1.49 GiB/s (490.3 µs) │ 2.52 GiB/s (290.9 µs) │ 1.69× faster │ │ sorted/8M │ 1003 MiB/s (11.97 ms) │ 2.32 GiB/s (5.04 ms) │ 2.37× faster │ └────────────┴───────────────────────┴───────────────────────┴───────────────────────┘ The advantage grows with size: longer chains mean more chain-compaction merges, each of which fires the whole-chunk passthrough and skips the per-record cmp + per-leaf byte copy that columnation can't avoid. At 8 MiB per side both paths drop into DRAM bandwidth, where saving the merge's byte movement shows up cleanly. For reference, the same column path on `mixed/8M` (random keys) lands at 338 MiB/s — sorted-shape data is ~7× faster on the same code, 2.3× faster than columnation on the same regime.
Mostly comment cleanup left over from the move from mz-repr to
mz-compute and from dropping the InternalMerge trait impl in favour
of a direct Merger impl. No behaviour change.
- Bench doc headers no longer claim to live in mz-repr.
- `columnar_merger.rs`'s top-level TODO retired (the Row-keyed
variant exists now in mz-compute).
- Stale `InternalMerger::merge` reference in the primitive bench's
doc points at the trait method actually called.
- Leftover commented-out `C2` trait bound in `Chunker`'s
`PushInto` impl removed.
- `ensure_capacity`'s rationale rewritten — `SizableContainer`
exists for our own `at_capacity` calls in `Merger::merge`
orchestration, not because of a defunct `InternalMerge`
supertrait.
Also two small semantic fixes:
- `Merger::account` now `expect("record_count is non-negative")`
instead of `unwrap_or(0)`, matching upstream
`InternalMerge::account`'s default. A negative `record_count` is
a bug, and silently zeroing it would defeat the telemetry that
`account` exists for.
- `merge_from`'s `n => unimplemented!("… {n} inputs")` arm became
`unreachable!("merge_from called with {n} inputs; expected 0, 1,
or 2")` — `Merger::merge` only ever passes 0/1/2 chunks, and
`unimplemented` implied a feature gap rather than the defensive
guard it actually is.
- Two `(a + b - 1) / b` ceil-divs in the e2e bench replaced with
`a.div_ceil(b)`, matching the existing usage in the same file.
7c85d3c to
d0f9658
Compare
Summary
Adds an all-
Columnmerge-batcher path: an input-side chunker(
ColumnChunker), anInternalMergeimpl onColumn<(D, T, R)>plugged into DD's standard
InternalMergervia theColumnMergertypealias, optimizations for the inner loop and the chunker, and three
microbenchmarks (primitive keys + Row-keyed merger + Row-keyed full
MergeBatcher).The legacy columnation-backed
Chunker/ColInternalMergerpath isunchanged; this PR is purely additive.
Commits
ColumnChunker+InternalMerge for Column<(D,T,R)>+ColumnMergertype alias. Naive 0/1/2-input merge with bulk-swap shortcut, equal-key consolidation via a reusableRaccumulator. Proptests coveringmerge_from/extractagainst a brute-force reference.extend_from_self's per-leaf 1-element memcpy.merge_from. Gallop bypass for length-1 runs (the common case on interleaved data). Microbench inbenches/columnar_merger.rscomparingColumnMergeragainstColInternalMerger.default_capacity-driven yield withColumnBuilder's serialized-words ship heuristic. Drop theLeafReservetrait + macro;SizableContainer for Column<C>is now minimal (required as a supertrait ofInternalMerge). Per-leaf inner-loop pushes/extends — destructure(D, T, R)::Containeronce, dispatch tosd/st/srdirectly so the gallop bulk-copy resolves to three independentextend_from_slicecalls the compiler can autovectorize.mz-timely-utilprimitive bench parameterized on per-side heap footprint; sweeps four sizes from L1-resident (32 KiB) to DRAM-resident (128 MiB) so the curves' cache crossovers are visible. Throughput in bytes (cross-record-shape comparable).Data = Row— the variable-length payload regime where parallel-array layout is expected to amortize per-leaf overhead.group.finish()— pulls medians from criterion'sestimates.jsonso we don't add aserde_jsondev-dep.copy_from + push(&owned)round-trip for D and T in theEqualbranch. Refs from the input borrow are valid through the merge step, so push them via each leaf'sPush<Ref<_>>impl directly. The original shape triggeredRow::copy_from = CompactBytes::clear() + extend_from_sliceper equal-key record, which freed and re-malloc'd every row that overflowed the inline budget.Container::reserve_for([left_borrow, right_borrow])inmerge_from, gated on a record-count threshold to avoid over-reserving on huge inputs where consolidation makes the input bound a 4× over-estimate.at_ship_thresholdcheck — every 1024 iterations via a power-of-two mask. The closure walks 4–5 leaf slices on variable-length leaves; overshooting the ship boundary by ~1 K records has no practical impact since the framework's outerat_capacityships the chunk regardless.copy_fromfix toColumnChunker— the chunker had the same Row-malloc thrash as the merger's Equal branch on every consolidated output record. Addextractship-threshold yield required byInternalMerge::extract's contract: the framework only checksat_capacitybetween calls, so without an inner-loop yield a single call could fill an output well past threshold.mz-reprtomz-computeand add a full-MergeBatcherRow bench that exercises the entire push-rounds + seal cycle (chunker + chain compaction + extract) — companion to the merger-only bench.Perf — primitive
((u64, u64), u64, i64)mz-timely-util/benches/columnar_merger.rs. Three regimes × four per-side heap-footprint targets, measured on Apple Silicon.Perf — Row-keyed
(Row, u64, i64),Merger::mergeonlymz-compute/benches/columnar_merger_row.rs. Each Row is(Int64, String)with a 24-character hex-string suffix that overflowsCompactBytes's inline budget — so columnation actually exercises itsLgAllocRegioncopy path and column packs into itsRowsvalue buffer. 32 KiB / 512 KiB / 8 MiB per side; 128 MiB skipped because Row-heavy setup costs would dominate.Perf — Row-keyed full
MergeBatcherpush + sealmz-compute/benches/columnar_merge_batcher_row.rs. Drives the entire batcher cycle for both the legacy (Chunker<ColumnationStack>+ColInternalMerger) and column (ColumnChunker+ColumnMerger) paths: 64 push-rounds of 4 K records each, then a finalsealso the chunks flow throughMerger::merge(chain compaction during push + final merge) andMerger::extract. Same regimes/sizes as the merger bench.Reading the curves
Row workloads with galloping room are where column wins decisively. Headline:
Merger::mergeRow/disjoint/8 MiB tops at 28.94 GiB/s vs columnation's 4.09 GiB/s — 7.07× faster. That's the merge-batcher's actual hot path for hydration: pre-sorted batches from persist composing into larger sorted runs.Row + variable-length payload closes the per-leaf push tax. The merger bench shows column tied or winning on every Row config; the only sub-1.0× ratios are within ~5%, well inside thermal noise on Apple Silicon. This is the regime that matters for compute arrangements over real data.
Primitives are a structural-floor regime for
Column. A 24-byte record × 4 leaf pushes per record vs columnation's single flat-slice push; the per-leaf dispatch cost is fully exposed when the working set fits in cache (worst gap 5.78× at 512 KiB collisions). Galloping (disjoint) still wins decisively at every cache-resident size (peak 4.21× at 8 MiB), where column's per-leafextend_from_selfis the right primitive for long runs.Full-
MergeBatcherbench tightens the 8M gap. End-to-end the column path is at-or-better than columnation on every cache-resident size (32 KiB / 512 KiB) and within 1.04–1.14× at 8 MiB. The 8M residual is the structural per-leaf push tax surfacing inmerge_fromcascades on interleaved keys; cache-resident regimes don't see it because chunker work dominates and the chunker is now competitive.The 128 MiB primitive
disjointslowdown (1.45× slower) is one bandwidth-pathological case: a single ~128 MiB memcpy where columnation's one-slice copy edges out column's three independent leaf copies. Run-to-run noise on Apple Silicon is high here — number swings 15–41 GiB/s across runs without code changes. Not a regime real workloads hit; chunks are ship-threshold-bounded at 2 MiB.Test plan
cargo nextest run -p mz-timely-util columnar— 15/15 pass (10 hand-written + 5 proptests).cargo bench -p mz-timely-util --bench columnar_merger— produces the primitive table above.cargo bench -p mz-compute --bench columnar_merger_row— produces the Row merger table.cargo bench -p mz-compute --bench columnar_merge_batcher_row— produces the full-batcher table.