Skip to content

timely-util: columnar-native merge-batcher (chunker + merger + bench)#36408

Open
DAlperin wants to merge 15 commits intoMaterializeInc:mainfrom
DAlperin:dov/columnar-merger
Open

timely-util: columnar-native merge-batcher (chunker + merger + bench)#36408
DAlperin wants to merge 15 commits intoMaterializeInc:mainfrom
DAlperin:dov/columnar-merger

Conversation

@DAlperin
Copy link
Copy Markdown
Member

@DAlperin DAlperin commented May 5, 2026

Summary

Adds an all-Column merge-batcher path: an input-side chunker
(ColumnChunker), an InternalMerge impl on Column<(D, T, R)>
plugged into DD's standard InternalMerger via the ColumnMerger type
alias, optimizations for the inner loop and the chunker, and three
microbenchmarks (primitive keys + Row-keyed merger + Row-keyed full
MergeBatcher).

The legacy columnation-backed Chunker / ColInternalMerger path is
unchanged; this PR is purely additive.

Commits

  1. ColumnChunker + InternalMerge for Column<(D,T,R)> + ColumnMerger type alias. Naive 0/1/2-input merge with bulk-swap shortcut, equal-key consolidation via a reusable R accumulator. Proptests covering merge_from / extract against a brute-force reference.
  2. Galloping for Less/Greater runs — view-projection so the gallop predicate touches only the D and T leaves; length-1 push fast path that skips extend_from_self's per-leaf 1-element memcpy.
  3. Mid-merge yield in merge_from. Gallop bypass for length-1 runs (the common case on interleaved data). Microbench in benches/columnar_merger.rs comparing ColumnMerger against ColInternalMerger.
  4. Address review: replace the default_capacity-driven yield with ColumnBuilder's serialized-words ship heuristic. Drop the LeafReserve trait + macro; SizableContainer for Column<C> is now minimal (required as a supertrait of InternalMerge). Per-leaf inner-loop pushes/extends — destructure (D, T, R)::Container once, dispatch to sd/st/sr directly so the gallop bulk-copy resolves to three independent extend_from_slice calls the compiler can autovectorize.
  5. mz-timely-util primitive bench parameterized on per-side heap footprint; sweeps four sizes from L1-resident (32 KiB) to DRAM-resident (128 MiB) so the curves' cache crossovers are visible. Throughput in bytes (cross-record-shape comparable).
  6. Row-keyed bench mirroring the same regimes/sizes with Data = Row — the variable-length payload regime where parallel-array layout is expected to amortize per-leaf overhead.
  7. Both benches emit a side-by-side throughput summary table (GiB/s and latency, with column-vs-columnation ratio) after group.finish() — pulls medians from criterion's estimates.json so we don't add a serde_json dev-dep.
  8. Inner-loop optimizations driven by xctrace profiling of the slowest Row regimes:
    • Drop the copy_from + push(&owned) round-trip for D and T in the Equal branch. Refs from the input borrow are valid through the merge step, so push them via each leaf's Push<Ref<_>> impl directly. The original shape triggered Row::copy_from = CompactBytes::clear() + extend_from_slice per equal-key record, which freed and re-malloc'd every row that overflowed the inline budget.
    • Pre-size output leaves via Container::reserve_for([left_borrow, right_borrow]) in merge_from, gated on a record-count threshold to avoid over-reserving on huge inputs where consolidation makes the input bound a 4× over-estimate.
    • Amortize the in-loop at_ship_threshold check — every 1024 iterations via a power-of-two mask. The closure walks 4–5 leaf slices on variable-length leaves; overshooting the ship boundary by ~1 K records has no practical impact since the framework's outer at_capacity ships the chunk regardless.
  9. Apply the same copy_from fix to ColumnChunker — the chunker had the same Row-malloc thrash as the merger's Equal branch on every consolidated output record. Add extract ship-threshold yield required by InternalMerge::extract's contract: the framework only checks at_capacity between calls, so without an inner-loop yield a single call could fill an output well past threshold.
  10. Move Row benches from mz-repr to mz-compute and add a full-MergeBatcher Row bench that exercises the entire push-rounds + seal cycle (chunker + chain compaction + extract) — companion to the merger-only bench.

Perf — primitive ((u64, u64), u64, i64)

mz-timely-util/benches/columnar_merger.rs. Three regimes × four per-side heap-footprint targets, measured on Apple Silicon.

┌─────────────────┬────────────────────────┬────────────────────────┬───────────────────────┐
│     Config      │      columnation       │         column         │ column vs columnation │
├─────────────────┼────────────────────────┼────────────────────────┼───────────────────────┤
│ mixed/32K       │ 18.19 GiB/s (2.9 µs)   │ 4.24 GiB/s (12.4 µs)   │          4.29× slower │
├─────────────────┼────────────────────────┼────────────────────────┼───────────────────────┤
│ collisions/32K  │ 19.13 GiB/s (2.7 µs)   │ 4.01 GiB/s (12.8 µs)   │          4.77× slower │
├─────────────────┼────────────────────────┼────────────────────────┼───────────────────────┤
│ disjoint/32K    │ 33.68 GiB/s (1.6 µs)   │ 56.73 GiB/s (941 ns)   │          1.68× faster │
├─────────────────┼────────────────────────┼────────────────────────┼───────────────────────┤
│ mixed/512K      │ 18.84 GiB/s (44.5 µs)  │ 3.56 GiB/s (235.7 µs)  │          5.29× slower │
├─────────────────┼────────────────────────┼────────────────────────┼───────────────────────┤
│ collisions/512K │ 18.83 GiB/s (44.4 µs)  │ 3.26 GiB/s (256.7 µs)  │          5.78× slower │
├─────────────────┼────────────────────────┼────────────────────────┼───────────────────────┤
│ disjoint/512K   │ 36.29 GiB/s (23.1 µs)  │ 99.15 GiB/s (8.5 µs)   │          2.73× faster │
├─────────────────┼────────────────────────┼────────────────────────┼───────────────────────┤
│ mixed/8M        │ 7.33 GiB/s (1.83 ms)   │ 2.95 GiB/s (4.55 ms)   │          2.49× slower │
├─────────────────┼────────────────────────┼────────────────────────┼───────────────────────┤
│ collisions/8M   │ 7.28 GiB/s (1.84 ms)   │ 2.64 GiB/s (5.07 ms)   │          2.76× slower │
├─────────────────┼────────────────────────┼────────────────────────┼───────────────────────┤
│ disjoint/8M     │ 34.67 GiB/s (386.3 µs) │ 146.09 GiB/s (91.7 µs) │          4.21× faster │
├─────────────────┼────────────────────────┼────────────────────────┼───────────────────────┤
│ mixed/128M      │ 6.88 GiB/s (31.17 ms)  │ 2.67 GiB/s (80.21 ms)  │          2.57× slower │
├─────────────────┼────────────────────────┼────────────────────────┼───────────────────────┤
│ collisions/128M │ 6.87 GiB/s (31.22 ms)  │ 2.48 GiB/s (86.56 ms)  │          2.77× slower │
├─────────────────┼────────────────────────┼────────────────────────┼───────────────────────┤
│ disjoint/128M   │ 27.29 GiB/s (7.86 ms)  │ 18.84 GiB/s (11.37 ms) │          1.45× slower │
└─────────────────┴────────────────────────┴────────────────────────┴───────────────────────┘

Perf — Row-keyed (Row, u64, i64), Merger::merge only

mz-compute/benches/columnar_merger_row.rs. Each Row is (Int64, String) with a 24-character hex-string suffix that overflows CompactBytes's inline budget — so columnation actually exercises its LgAllocRegion copy path and column packs into its Rows value buffer. 32 KiB / 512 KiB / 8 MiB per side; 128 MiB skipped because Row-heavy setup costs would dominate.

┌─────────────────┬───────────────────────┬────────────────────────┬───────────────────────┐
│     Config      │      columnation      │         column         │ column vs columnation │
├─────────────────┼───────────────────────┼────────────────────────┼───────────────────────┤
│ mixed/32K       │ 3.70 GiB/s (19.8 µs)  │ 3.54 GiB/s (20.8 µs)   │          1.05× slower │
├─────────────────┼───────────────────────┼────────────────────────┼───────────────────────┤
│ collisions/32K  │ 7.10 GiB/s (4.7 µs)   │ 6.88 GiB/s (4.9 µs)    │          1.03× slower │
├─────────────────┼───────────────────────┼────────────────────────┼───────────────────────┤
│ disjoint/32K    │ 4.09 GiB/s (16.9 µs)  │ 6.81 GiB/s (10.2 µs)   │          1.67× faster │
├─────────────────┼───────────────────────┼────────────────────────┼───────────────────────┤
│ mixed/512K      │ 3.02 GiB/s (391.0 µs) │ 2.98 GiB/s (395.4 µs)  │          1.01× slower │
├─────────────────┼───────────────────────┼────────────────────────┼───────────────────────┤
│ collisions/512K │ 5.92 GiB/s (93.3 µs)  │ 5.86 GiB/s (94.3 µs)   │          1.01× slower │
├─────────────────┼───────────────────────┼────────────────────────┼───────────────────────┤
│ disjoint/512K   │ 5.55 GiB/s (200.6 µs) │ 20.31 GiB/s (54.8 µs)  │          3.66× faster │
├─────────────────┼───────────────────────┼────────────────────────┼───────────────────────┤
│ mixed/8M        │ 2.54 GiB/s (7.43 ms)  │ 2.78 GiB/s (6.80 ms)   │          1.09× faster │
├─────────────────┼───────────────────────┼────────────────────────┼───────────────────────┤
│ collisions/8M   │ 4.48 GiB/s (1.97 ms)  │ 5.46 GiB/s (1.62 ms)   │          1.22× faster │
├─────────────────┼───────────────────────┼────────────────────────┼───────────────────────┤
│ disjoint/8M     │ 4.09 GiB/s (4.35 ms)  │ 28.94 GiB/s (615.2 µs) │          7.07× faster │
└─────────────────┴───────────────────────┴────────────────────────┴───────────────────────┘

Perf — Row-keyed full MergeBatcher push + seal

mz-compute/benches/columnar_merge_batcher_row.rs. Drives the entire batcher cycle for both the legacy (Chunker<ColumnationStack> + ColInternalMerger) and column (ColumnChunker + ColumnMerger) paths: 64 push-rounds of 4 K records each, then a final seal so the chunks flow through Merger::merge (chain compaction during push + final merge) and Merger::extract. Same regimes/sizes as the merger bench.

┌─────────────────┬───────────────────────┬───────────────────────┬───────────────────────┐
│     Config      │      columnation      │        column         │ column vs columnation │
├─────────────────┼───────────────────────┼───────────────────────┼───────────────────────┤
│ mixed/32K       │ 824 MiB/s (227.5 µs)  │ 897 MiB/s (209.0 µs)  │          1.09× faster │
├─────────────────┼───────────────────────┼───────────────────────┼───────────────────────┤
│ collisions/32K  │ 1.02 GiB/s (180.3 µs) │ 1.02 GiB/s (179.3 µs) │          1.01× faster │
├─────────────────┼───────────────────────┼───────────────────────┼───────────────────────┤
│ disjoint/32K    │ 876 MiB/s (214.1 µs)  │ 919 MiB/s (204.1 µs)  │          1.05× faster │
├─────────────────┼───────────────────────┼───────────────────────┼───────────────────────┤
│ mixed/512K      │ 547 MiB/s (1.37 ms)   │ 554 MiB/s (1.35 ms)   │          1.01× faster │
├─────────────────┼───────────────────────┼───────────────────────┼───────────────────────┤
│ collisions/512K │ 596 MiB/s (1.26 ms)   │ 614 MiB/s (1.22 ms)   │          1.03× faster │
├─────────────────┼───────────────────────┼───────────────────────┼───────────────────────┤
│ disjoint/512K   │ 557 MiB/s (1.35 ms)   │ 571 MiB/s (1.31 ms)   │          1.03× faster │
├─────────────────┼───────────────────────┼───────────────────────┼───────────────────────┤
│ mixed/8M        │ 337 MiB/s (35.60 ms)  │ 297 MiB/s (40.40 ms)  │          1.14× slower │
├─────────────────┼───────────────────────┼───────────────────────┼───────────────────────┤
│ collisions/8M   │ 384 MiB/s (31.23 ms)  │ 369 MiB/s (32.50 ms)  │          1.04× slower │
├─────────────────┼───────────────────────┼───────────────────────┼───────────────────────┤
│ disjoint/8M     │ 338 MiB/s (35.49 ms)  │ 320 MiB/s (37.55 ms)  │          1.06× slower │
└─────────────────┴───────────────────────┴───────────────────────┴───────────────────────┘

Reading the curves

  • Row workloads with galloping room are where column wins decisively. Headline: Merger::merge Row/disjoint/8 MiB tops at 28.94 GiB/s vs columnation's 4.09 GiB/s — 7.07× faster. That's the merge-batcher's actual hot path for hydration: pre-sorted batches from persist composing into larger sorted runs.

  • Row + variable-length payload closes the per-leaf push tax. The merger bench shows column tied or winning on every Row config; the only sub-1.0× ratios are within ~5%, well inside thermal noise on Apple Silicon. This is the regime that matters for compute arrangements over real data.

  • Primitives are a structural-floor regime for Column. A 24-byte record × 4 leaf pushes per record vs columnation's single flat-slice push; the per-leaf dispatch cost is fully exposed when the working set fits in cache (worst gap 5.78× at 512 KiB collisions). Galloping (disjoint) still wins decisively at every cache-resident size (peak 4.21× at 8 MiB), where column's per-leaf extend_from_self is the right primitive for long runs.

  • Full-MergeBatcher bench tightens the 8M gap. End-to-end the column path is at-or-better than columnation on every cache-resident size (32 KiB / 512 KiB) and within 1.04–1.14× at 8 MiB. The 8M residual is the structural per-leaf push tax surfacing in merge_from cascades on interleaved keys; cache-resident regimes don't see it because chunker work dominates and the chunker is now competitive.

  • The 128 MiB primitive disjoint slowdown (1.45× slower) is one bandwidth-pathological case: a single ~128 MiB memcpy where columnation's one-slice copy edges out column's three independent leaf copies. Run-to-run noise on Apple Silicon is high here — number swings 15–41 GiB/s across runs without code changes. Not a regime real workloads hit; chunks are ship-threshold-bounded at 2 MiB.

Test plan

  • cargo nextest run -p mz-timely-util columnar — 15/15 pass (10 hand-written + 5 proptests).
  • cargo bench -p mz-timely-util --bench columnar_merger — produces the primitive table above.
  • cargo bench -p mz-compute --bench columnar_merger_row — produces the Row merger table.
  • cargo bench -p mz-compute --bench columnar_merge_batcher_row — produces the full-batcher table.
  • CI green.

@DAlperin DAlperin requested a review from a team as a code owner May 5, 2026 17:18
Copy link
Copy Markdown
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool! I left some comments inline, but overall this points in the right direction.

Comment thread src/timely-util/src/columnar/batcher.rs Outdated
// inlines through the leaf chain; tracking a local counter
// instead measured slower, presumably because the extra
// updates added register pressure to the hot loop.
let target = timely::container::buffer::default_capacity::<(D, T, R)>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be using the default capacity here (or ever, really). Instead, I'd copy the heuristic the column builder encodes: If we're within 10% of 2MiB (or a configurable constant, const), we ship the batch. Question is whether we should serialize or not, but I lean toward "yes".

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean serialize on the boundaries? It's trivial enough to serialize in extract but it will just flow directly into a trace builder which will probably have to decode it to bytes anyway right away

Comment thread src/timely-util/src/columnar/batcher.rs Outdated
Comment on lines +394 to +395
let (d1, t1, r1) = left_borrow.get(left_pos[0]);
let (d2, t2, r2) = right_borrow.get(right_pos[0]);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What helped in the consolidating column builder: Use the fact that (D, T, R)::Container is (D::Container, T::Container, R::Container), which allows us to construct the result using three separate containers, as long as the length matches. Especially with galloping, writing runs of values to a single container enables vectorization.

Comment thread src/timely-util/src/columnar.rs Outdated
tuple_leaf_reserve!(A a, B b, C c);
tuple_leaf_reserve!(A a, B b, C c, D d);

impl<C: Columnar> SizableContainer for Column<C>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If at all possible, we shouldn't implement SizableContainer for Column. It's legacy, and each use is a smell.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimelyDataflow/differential-dataflow#738 would allow us to remove the requirement.

Comment thread src/timely-util/src/columnar.rs Outdated
/// fresh chunk's leaf Vecs filling up. Without it, each chunk's leaves grow
/// from zero by doubling, and the cumulative realloc traffic shows up as
/// allocator memmove cost on the hot path.
pub trait LeafReserve {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we switch from the default capacity to serialized capacity, we don't need this helper trait anymore. We'd build into one owned copy, and to ship it we'd serialize into a new one and clear. After the first round, we should have appropriate capacity, so we don't have to worry about pre-sizing.

}

fn bench_merge(c: &mut Criterion) {
let n = 10_000usize;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might help to define the n not in terms of elements but size, and then scale for specific heap sizes.

@DAlperin DAlperin force-pushed the dov/columnar-merger branch from d7d6bb5 to 8687d94 Compare May 5, 2026 22:21
Copy link
Copy Markdown
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Column<(D,T,R)>::extract (src/timely-util/src/columnar/batcher.rs:548) violates the InternalMerge::extract contract: it never yields mid-buffer, so the framework can't flush keep/ship until the input is fully drained. Other impls (Vec, ColumnationStack) gate their inner loop on !keep.at_capacity() && !ship.at_capacity().

Output chunks overshoot the ship threshold by up to one full input buffer. Not sure how bad that is, but could waste a bunch of memory?
Small test:

diff --git a/src/timely-util/src/columnar/batcher.rs b/src/timely-util/src/columnar/batcher.rs
index 600b14c4c3..f39b42f846 100644
--- a/src/timely-util/src/columnar/batcher.rs
+++ b/src/timely-util/src/columnar/batcher.rs
@@ -926,4 +926,43 @@ mod proptests {
             prop_assert!(frontier.elements().is_empty());
         }
     }
+
+    proptest! {
+        #![proptest_config(ProptestConfig { cases: 16, ..ProptestConfig::default() })]
+
+        #[mz_ore::test]
+        #[cfg_attr(miri, ignore)]
+        fn extract_yields_when_keep_at_capacity(upper_time in 0u64..=4) {
+            use timely::container::SizableContainer;
+
+            let mut keep: Column<Tuple> = Default::default();
+            let mut k = 0u64;
+            while !keep.at_capacity() {
+                keep.push_into(((k, 0u64), upper_time, 1i64));
+                k += 1;
+            }
+
+            let mut self_col: Column<Tuple> = Default::default();
+            for i in 0..32u64 {
+                self_col.push_into(((i, 1u64), upper_time, 1i64));
+            }
+            let len = self_col.borrow().len();
+
+            let upper = Antichain::from_elem(upper_time);
+            let mut frontier: Antichain<u64> = Antichain::new();
+            let mut ship: Column<Tuple> = Default::default();
+            let mut position = 0;
+
+            InternalMerge::extract(
+                &mut self_col,
+                &mut position,
+                upper.borrow(),
+                &mut frontier,
+                &mut keep,
+                &mut ship,
+            );
+
+            prop_assert!(position < len, "position={}, len={}", position, len);
+        }
+    }
 }

@DAlperin
Copy link
Copy Markdown
Member Author

DAlperin commented May 6, 2026

@def- yep good point, will fix tomorrow

@DAlperin DAlperin marked this pull request as draft May 6, 2026 05:06
Comment thread src/timely-util/src/columnar/batcher.rs Outdated
// and ships it regardless.
let at_ship_threshold =
|sd: &D::Container, st: &T::Container, sr: &R::Container| {
use columnar::AsBytes as _;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have a function for this

Comment thread src/timely-util/src/columnar/batcher.rs Outdated
pub type ColumnMerger<D, T, R> = InternalMerger<Column<(D, T, R)>>;

/// `InternalMerge` for [`Column`]-shaped sorted chunks.
impl<D, T, R> InternalMerge for Column<(D, T, R)>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently InternalMerge is supposed to go away. Let's implement Merger directly?

@DAlperin DAlperin force-pushed the dov/columnar-merger branch 2 times, most recently from 8f4d1ff to 592b765 Compare May 6, 2026 19:40
@DAlperin DAlperin marked this pull request as ready for review May 6, 2026 20:26
@DAlperin DAlperin force-pushed the dov/columnar-merger branch from 592b765 to 7c85d3c Compare May 7, 2026 14:37
DAlperin added 15 commits May 7, 2026 10:50
Replace the columnation detour in the merge-batcher path with all-Column
plumbing. This commit adds the input-side chunker and the InternalMerge
impl; the seal builder and rewiring Col2ValBatcher are follow-ups, so the
legacy Chunker / ColInternalMerger path remains active.

* ColumnChunker<U> — same sort + consolidate loop as Chunker<C> but output
  stays in Column<U>, no ColumnationStack round-trip. Reuses owned scratch
  slots across pushes.
* SizableContainer for Column<C> — at_capacity left unimplemented!()
  (capacity policy deferred); ensure_capacity is a debug_assert! since
  merger-internal chunks are always Column::Typed.
* InternalMerge for Column<(D, T, R)> — 0/1/2-input merge with the
  empty-self bulk-swap shortcut and Container::extend_from_self for the
  Less/Greater arms. Equal-key branch consolidates diffs in a reusable R
  accumulator and drops zero sums. extract partitions by frontier in a
  single pass (no mid-call yielding, matching DD's columnar-trie path).
* ColumnMerger<D, T, R> type alias for InternalMerger<Column<(D, T, R)>>.
* Proptests covering merge_from and extract against a brute-force
  reference; verified by injecting a missing-frontier-insert mutation.

Naive merge for now; galloping the Less/Greater runs is a follow-up.
Builds on the parent commit (gallop view-projection + length-1 push) with
two more optimizations and a microbenchmark for end-to-end measurement.

* **Pre-allocation via `LeafReserve` + mid-merge yield** — adds a
  `LeafReserve` trait (impl'd for `Vec<T>` and tuples) used by
  `Column::ensure_capacity` to pre-reserve `default_capacity` records per
  leaf on each fresh chunk, avoiding the cumulative geometric Vec-grow
  cost. Adds a matching `borrow().len() < target` check in `merge_from`
  so the loop yields back to the merge driver at the chunk boundary
  instead of running to one input's exhaustion and overshooting the
  reservation.
* **Gallop bypass for length-1 runs** — the common case on interleaved
  data is a 1-record advance. Push directly + advance, then peek at the
  next record; only enter `gallop` (and the bulk-copy branch) when an
  actual run is detected. Skips per-iteration `gallop` setup + the first
  cmp probe for runs of length 1.

Microbench in `benches/columnar_merger.rs` drives a 2-input merge through
`InternalMerger::merge` over three regimes — mixed (interleaved keys),
collisions (small key range, equal-key consolidation), disjoint
(non-overlapping ranges, long runs). Wall-time on `((u64, u64), u64,
i64)` data with n=10K per side, cumulative across this commit and its
parent:

| config     | columnation | column initial | column final | improvement |
|------------|-------------|----------------|--------------|-------------|
| mixed      |    27.9 µs  |       204 µs   |    138.9 µs  |    -32%     |
| collisions |    28.0 µs  |       220 µs   |    147.0 µs  |    -33%     |
| disjoint   |    14.8 µs  |       6.3 µs   |      6.5 µs  |   ~unchanged|

Column is now ~2.2x faster than columnation on disjoint (long-run-heavy)
data where galloping bulk-copy wins, and ~5x slower on dense
primitive-keyed mixed/collisions data — that residual gap is the
parallel-array layout cost (4 leaf Vecs, 4x per-leaf trait dispatch per
record) vs ColumnationStack's single flat slice. Closing it further
needs either a different wire shape (trie) or variable-length keys
(Row), where per-record overhead amortizes.

Adds a TODO in the bench for a Row-keyed variant; gated on `Rows` (the
columnar container for `Row`) being reachable from a dev-dep, currently
private to a module in `mz-repr`.
- Drop the `copy_from(&mut owned_*) → push(&owned_*)` round-trip for D and T in the `Equal` branch; refs from the input borrow are valid through the merge step, so push them via each leaf's `Push<Ref<_>>` impl directly. The original shape triggered `Row::copy_from = CompactBytes::clear() + extend_from_slice` per equal-key record, which freed and re-mallocated every row that overflowed the inline budget. On Row workloads this was the largest hot spot in the profile.

- Pre-size each output leaf via `Container::reserve_for([left_borrow, right_borrow])` so push paths don't pay geometric `grow_amortized + realloc`. Skipped past a record-count threshold to avoid over-reserving on huge inputs where consolidation makes the input bound a 4× over-estimate.

- Amortize the in-loop `at_ship_threshold` check: bumped from per-iteration to every 1024 iterations via a power-of-two mask. The closure walks 4–5 leaf slices per call, which is non-trivial on variable-length leaves like `Rows`; overshooting the ship boundary by up to a thousand records before noticing has no practical impact — the outer driver's `at_capacity` ships the chunk regardless.
…er InternalMerge contract

- The chunker's `copy_from(&mut owned_*) → push(&owned_*)` round-trip
  for D and T triggered `Row::copy_from = CompactBytes::clear() +
  extend_from_slice` per output record, freeing and re-mallocating every
  row that overflowed the inline budget. Refs from the input borrow are
  valid through the sweep, so push them via each leaf's `Push<Ref<_>>`
  impl directly. Same fix as `merge_from`'s Equal branch from the
  previous commit. R still uses an owned scratch since it carries the
  consolidated sum.

- `extract` now yields when either output buffer reaches the ship
  threshold. Required by `InternalMerge::extract`'s contract: the
  framework only checks `at_capacity` between calls, so without an
  inner-loop yield a single call could fill an output buffer well past
  threshold.
The InternalMerger wrapper is being removed upstream. Replace the
`pub type ColumnMerger<D, T, R> = InternalMerger<Column<(D, T, R)>>`
type alias with a phantom-marker struct that implements diff-dataflow's
`Merger` trait directly.

The per-chunk merge_from / extract logic stays in this module, moved
from the dropped `InternalMerge` impl onto inherent methods on
`Column<(D, T, R)>`. `ColumnMerger::merge` and `::extract` orchestrate
those over chains of chunks (drain pairs in lockstep, ship full results,
recycle inputs via stash) — a specialized port of the previous generic
`InternalMerger` orchestration. Proptests now call the inherent methods
directly.

No behaviour change: same merge/consolidate/extract semantics, same
`SizableContainer::at_capacity` ship signal, same gallop / pre-reserve /
amortized threshold-check optimizations.
Two optimizations in the merge driver, unlocked by owning Merger
directly:

1. Whole-chunk passthrough in `Merger::merge`. When one head's tail
   (from its current position) is sortable-before the other head's
   current record, the entire tail is appended to `output` via
   `mem::replace` — no per-record compares, no per-leaf
   `extend_from_self` byte copies. Two probes per outer iteration
   gate the fast path; the `<` boundary preserves the strict-sort
   invariant across output chunks (so equal-key consolidation still
   runs through `merge_from` as before).

2. `Column::merge_from` returns `bool`. `true` signals the inner
   amortized ship-threshold check fired; the outer loop ships
   without redoing the `at_capacity` walk on the result's leaf
   bytes. `false` (input-exhausted path) still falls through to the
   existing `at_capacity` check to catch overshoot accumulation
   across short calls.

Three new tests in addition to the existing 15 (10 hand-written +
5 proptests), driving `Merger::merge` directly:
- `merger_disjoint_chains_passthrough` — disjoint ranges, every
  chunk hits the fast path.
- `merger_interleaved_chains` — alternating keys, fast path never
  fires, exercises the bool-return path.
- `merger_passthrough_respects_equal_boundary` — chain1.last ==
  chain2.first; passthrough must NOT fire (would split equal keys
  across output chunks unconsolidated).

Perf — full `MergeBatcher` Row push + seal,
`mz-compute/benches/columnar_merge_batcher_row.rs`. Column path
percent-change vs the `Merger`-direct baseline (criterion
`--baseline before`):

  mixed/8M:        +20% throughput  (-16.5% time)
  collisions/8M:   +6%              (-5.7%)
  disjoint/8M:     +7%              (-6.8%)
  mixed/512K:      +1%              (within noise)
  collisions/512K: +7%              (-6.7%)
  disjoint/512K:   +3%              (-2.4%)

32K sizes within noise — passthrough barely fires when the chain
is one or two chunks. 8M is where the wins concentrate: chains
have ~125 chunks each, so the passthrough fast path applies many
times per merge, and the cumulative byte-copy savings show
through end-to-end.
Clears `clippy::as_conversions` across the columnar merger benches
and one proptest. Conversions split between three idioms:

  - `CastFrom` for widening integer casts (`u64::cast_from(usize)`,
    `usize::cast_from(u32)`).
  - `CastLossy` for f64-from-integer (the throughput formatters
    that convert byte counts to GiB/s).
  - `ReinterpretCast` for u64↔i64, used by Row keys round-tripped
    through `Datum::Int64`.

Also drops a needless `Some(...?)` wrapper on the bench's
`config_bytes` helper.
Adds a `sorted` regime to the e2e MergeBatcher Row bench, modeling
the persist-hydration shape: one globally sorted+consolidated dataset
of `n_total` records, sliced into contiguous rounds whose key ranges
are non-overlapping with their neighbors.

This is what compute sees when reading a sorted snapshot from
persist — each on-the-wire batch is a tile of the same global sort
order — and it's the regime where the column path's whole-chunk
passthrough fast path fires on every chain compaction.

Head-to-head e2e MergeBatcher results — Row, push 64 rounds + final
seal, throughput in payload bytes:

┌────────────┬───────────────────────┬───────────────────────┬───────────────────────┐
│  Config    │      columnation      │        column         │ column vs columnation │
├────────────┼───────────────────────┼───────────────────────┼───────────────────────┤
│ sorted/32K │ 2.32 GiB/s (78.8 µs)  │ 2.54 GiB/s (72.1 µs)  │          1.09× faster │
│ sorted/512K│ 1.49 GiB/s (490.3 µs) │ 2.52 GiB/s (290.9 µs) │          1.69× faster │
│ sorted/8M  │ 1003 MiB/s (11.97 ms) │ 2.32 GiB/s (5.04 ms)  │          2.37× faster │
└────────────┴───────────────────────┴───────────────────────┴───────────────────────┘

The advantage grows with size: longer chains mean more chain-compaction
merges, each of which fires the whole-chunk passthrough and skips the
per-record cmp + per-leaf byte copy that columnation can't avoid.
At 8 MiB per side both paths drop into DRAM bandwidth, where saving
the merge's byte movement shows up cleanly.

For reference, the same column path on `mixed/8M` (random keys) lands
at 338 MiB/s — sorted-shape data is ~7× faster on the same code,
2.3× faster than columnation on the same regime.
Mostly comment cleanup left over from the move from mz-repr to
mz-compute and from dropping the InternalMerge trait impl in favour
of a direct Merger impl. No behaviour change.

- Bench doc headers no longer claim to live in mz-repr.
- `columnar_merger.rs`'s top-level TODO retired (the Row-keyed
  variant exists now in mz-compute).
- Stale `InternalMerger::merge` reference in the primitive bench's
  doc points at the trait method actually called.
- Leftover commented-out `C2` trait bound in `Chunker`'s
  `PushInto` impl removed.
- `ensure_capacity`'s rationale rewritten — `SizableContainer`
  exists for our own `at_capacity` calls in `Merger::merge`
  orchestration, not because of a defunct `InternalMerge`
  supertrait.

Also two small semantic fixes:

- `Merger::account` now `expect("record_count is non-negative")`
  instead of `unwrap_or(0)`, matching upstream
  `InternalMerge::account`'s default. A negative `record_count` is
  a bug, and silently zeroing it would defeat the telemetry that
  `account` exists for.
- `merge_from`'s `n => unimplemented!("… {n} inputs")` arm became
  `unreachable!("merge_from called with {n} inputs; expected 0, 1,
  or 2")` — `Merger::merge` only ever passes 0/1/2 chunks, and
  `unimplemented` implied a feature gap rather than the defensive
  guard it actually is.
- Two `(a + b - 1) / b` ceil-divs in the e2e bench replaced with
  `a.div_ceil(b)`, matching the existing usage in the same file.
@DAlperin DAlperin force-pushed the dov/columnar-merger branch from 7c85d3c to d0f9658 Compare May 7, 2026 14:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants