From 23bf6b912341f617cb8c704428e9531bbce66899 Mon Sep 17 00:00:00 2001 From: prostomarkeloff <28061158+prostomarkeloff@users.noreply.github.com> Date: Sat, 30 May 2026 16:35:24 +0300 Subject: [PATCH] Add simjoin: exact weighted-cosine similarity join (L2AP, CPU+GPU) + Python bindings (v0.3.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit simjoin is an exact all-pairs weighted-cosine similarity join over sparse non-negative vectors — every pair with cos >= t, no approximation — the principled exact replacement for shingle-candidate + verify near-duplicate detection (Type-3 code clones). - src/simjoin.rs: SOTA L2AP (inverted index + Cauchy-Schwarz prefix prune) + branchless sorted-merge dot + cache-packed prune state + rayon parallel probe. Asserted bit-identical to an O(n^2) brute-force oracle on fuzzed corpora. API: Corpus::{from_rows, from_token_docs}, cosine_join, cosine_join_with(Concurrency), CosineJoiner handle. - src/simjoin_gpu.rs: Metal batch_cosine kernel. The verify is memory-bandwidth-bound; the Apple GPU clears the random-gather dots ~3x the CPU (53 vs 22 GB/s). Metal is f32-only, so f64 byte-parity is preserved via GPU-filter + CPU-exact-reverify (gpu+cpu), with a pure-f32 fast path (gpu, <=1 differing pair per millions). ~1.8-2x end-to-end on real top-300 PyPI. - Three backends behind the existing Concurrency enum (cpu / gpu+cpu / gpu), CPU fallback. - Python: difflib_fast.cosine_join(docs, t, concurrency, threads) + CosineJoiner — token docs to TF-IDF in Rust, auto-parallel (rayon, GIL released) like ratio_many. - examples/{simjoin_bench, simjoin_pypi, simjoin_gpu_bench}; profiling cargo feature. - README + benchmarks.md: simjoin section. Bump 0.2.0 -> 0.3.0. --- Cargo.lock | 2 +- Cargo.toml | 13 +- README.md | 48 +- benchmarks.md | 61 ++- examples/simjoin_bench.rs | 115 +++++ examples/simjoin_gpu_bench.rs | 236 +++++++++ examples/simjoin_pypi.rs | 94 ++++ python/difflib_fast/__init__.py | 11 +- python/difflib_fast/__init__.pyi | 43 ++ src/lib.rs | 111 ++++- src/simjoin.rs | 816 +++++++++++++++++++++++++++++++ src/simjoin_gpu.rs | 170 +++++++ 12 files changed, 1701 insertions(+), 19 deletions(-) create mode 100644 examples/simjoin_bench.rs create mode 100644 examples/simjoin_gpu_bench.rs create mode 100644 examples/simjoin_pypi.rs create mode 100644 src/simjoin.rs create mode 100644 src/simjoin_gpu.rs diff --git a/Cargo.lock b/Cargo.lock index b356f19..4d55df0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,7 +78,7 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "difflib-fast" -version = "0.2.0" +version = "0.3.0" dependencies = [ "metal", "mimalloc", diff --git a/Cargo.toml b/Cargo.toml index a059621..b59ba55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "difflib-fast" -version = "0.2.0" -description = "Fast, byte-for-byte exact difflib Ratcliff–Obershelp (gestalt) similarity ratio + single-linkage clustering, via a suffix automaton." +version = "0.3.0" +description = "Fast, byte-for-byte exact difflib Ratcliff–Obershelp (gestalt) similarity ratio + single-linkage clustering (suffix automaton), plus an exact all-pairs weighted-cosine similarity join (L2AP, CPU+GPU)." keywords = ["difflib", "similarity", "ratcliff-obershelp", "suffix-automaton", "fuzzy"] categories = ["algorithms", "text-processing"] edition = "2021" @@ -48,6 +48,10 @@ gpu = ["dep:metal"] # lengths. Costs ~10-20% wall when active (relaxed-ordered atomics on the hot path); the # `cfg(feature)` gate compiles to a no-op in default builds so production stays untouched. instrument = [] +# `profiling` marks `simjoin`'s hot phases `#[inline(never)]` so the sampler (samply) attributes +# self-time per phase (candidate-gen vs verify vs index-suffix) instead of one inlined `cosine_join` +# blob. Pure observability — compiles to identical codegen as default when off. Never ship it. +profiling = [] # `objc`'s `sel!`/`msg_send!` macros (pulled in transitively by the `metal` crate under the `gpu` # feature) expand to `cfg(feature = "cargo-clippy")` checks — whitelist that cfg so the lint stays @@ -67,6 +71,11 @@ pedantic = { level = "deny", priority = -1 } name = "bench" required-features = ["bench"] +# GPU-vs-CPU throughput experiment for the simjoin verify step (Apple Metal). Needs the `gpu` feature. +[[example]] +name = "simjoin_gpu_bench" +required-features = ["gpu"] + # Keep symbols + line tables in release so an external sampler (samply) can resolve frames. [profile.release] debug = "line-tables-only" diff --git a/README.md b/README.md index 2e888e4..2c7d3de 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ assert_eq!(ratio("the quick brown fox", "the quick brown dog"), 0.89473684210526 ``` ```toml -difflib-fast = "0.1" +difflib-fast = "0.3" ``` --- @@ -140,7 +140,7 @@ default (the GPU paths remain opt-in via `DFGPU_RATIO_MANY_THRESHOLD` / `DFGPU_M the feature off, on non-macOS, or with no Metal device, every call quietly runs on CPU. ```toml -difflib-fast = { version = "0.1", features = ["gpu"] } # macOS only +difflib-fast = { version = "0.3", features = ["gpu"] } # macOS only ``` --- @@ -218,7 +218,7 @@ pick the wheel for you — grab the one for your platform from the ```bash # macOS Apple Silicon — swap the filename for your platform (see below): -pip install https://github.com/prostomarkeloff/difflib-fast/releases/download/v0.2.0/difflib_fast-0.2.0-cp39-abi3-macosx_11_0_arm64.whl +pip install https://github.com/prostomarkeloff/difflib-fast/releases/download/v0.3.0/difflib_fast-0.3.0-cp39-abi3-macosx_11_0_arm64.whl ``` | platform | wheel suffix | @@ -298,6 +298,48 @@ default, so list both). --- +## Also: exact cosine similarity join (`simjoin`) + +The same "exact, or it's a bug" discipline, pointed at a different metric. **`simjoin`** is an exact +all-pairs **weighted-cosine** similarity join over sparse non-negative vectors — *every* pair with +`cos ≥ t`, no LSH, no approximation — on the provably-SOTA **L2AP** algorithm (inverted index + +Cauchy–Schwarz prefix pruning; Anastasiu & Karypis, ICDE'14). It's the principled exact replacement for +"shingle candidates → verify" near-duplicate detection: documents = functions, dimensions = canonical +lines, weights = IDF — i.e. **exact Type-3 code-clone detection**. + +```python +import difflib_fast as df + +# documents as token lists → TF-IDF in Rust → every pair with cosine ≥ 0.8 +docs = [["def _fn(_v0):", "return _v0 + 1"], + ["def _fn(_v0):", "return _v0 + 1"], # an exact clone of doc 0 + ["import os", "import sys"]] +df.cosine_join(docs, 0.8) # → [(0, 1, 1.0)] tuples are (j, i, cos), j < i +df.cosine_join(docs, 0.8, "gpu") # same join, the dot-products run on the Metal GPU +``` + +Three backends, one argument (`concurrency=`) — all auto-parallel across every core (rayon, GIL +released, exactly like `ratio`): + +| `concurrency` | how | result | +|---|---|---| +| `"cpu"` | L2AP on all cores | exact `f64` | +| `"gpu+cpu"` | CPU prunes ~99% of candidates, GPU verifies the rest (f32 filter), CPU re-scores survivors exactly | **byte-identical to `"cpu"`** | +| `"gpu"` | CPU prunes, GPU verifies, emit the f32 score | ε-exact (≤ 1 differing pair per **millions**) | + +On the real **top-300 PyPI** corpus (287,408 functions, 3.1M clone pairs found) the verify is +memory-**bandwidth**-bound, and the Apple GPU's memory-level parallelism wins it: **53 GB/s** of +random-gather sparse dot-products vs the CPU's 22 GB/s, so the GPU backends run the whole join +**~1.8–2× faster than the (already L2AP-tuned) CPU**, byte-for-byte. Brute force would be ~4·10¹⁰ pairs +(hours); this is seconds. `CosineJoiner(docs)` is the stateful handle (build corpus + GPU upload once, +sweep thresholds); full numbers in [`benchmarks.md`](benchmarks.md#6-similarity-join-simjoin). + +In Rust: `difflib_fast::simjoin::{Corpus, cosine_join, cosine_join_with, CosineJoiner}` (GPU backends +behind the `gpu` feature). Same correctness gate as the rest of the crate — the indexed join is +asserted **bit-identical to an O(n²) brute-force oracle** on hundreds of fuzzed corpora. + +--- + ## How it works The metric is **Ratcliff–Obershelp** (Ratcliff & Obershelp, 1988) computed over a **suffix automaton** diff --git a/benchmarks.md b/benchmarks.md index fe4faef..dbd700c 100644 --- a/benchmarks.md +++ b/benchmarks.md @@ -2,7 +2,9 @@ Exact **Ratcliff–Obershelp** (Python `difflib.SequenceMatcher(..., autojunk=False).ratio()`) throughput and head-to-head vs every other implementation we could find. `pairs/s` = pairwise `ratio` decisions -per second. Speedups (`Nx`) are **difflib-fast ÷ competitor**. +per second. Speedups (`Nx`) are **difflib-fast ÷ competitor**. §1–5 + Landscape cover the RO path; +[**§6**](#6-similarity-join-simjoin) benches the crate's other exact capability — `simjoin`, an +all-pairs weighted-cosine similarity join (L2AP) — on a real 287k-function corpus, CPU vs GPU. ## Setup @@ -270,3 +272,60 @@ exists to close. The libraries that beat difflib-fast outright on raw speed (RapidFuzz, strsim) do so by computing a **different metric** (Indel/Levenshtein), not difflib's ratio — so they aren't drop-in replacements for `difflib.ratio()`. + +--- + +## 6. Similarity join (`simjoin`) — exact weighted-cosine + +A different capability in the same crate, held to the same exactness bar. **`simjoin`** is an exact +all-pairs **weighted-cosine** similarity join over sparse non-negative vectors (every pair with +`cos ≥ t`), on the SOTA **L2AP** algorithm — inverted index + Cauchy–Schwarz prefix-prune (Anastasiu & +Karypis, ICDE'14). The exact replacement for shingle-candidate + verify near-duplicate detection +(functions × IDF-weighted canonical lines = exact Type-3 clone detection). Correctness gate: the indexed +join is **bit-identical to an `O(n²)` brute-force oracle** on fuzzed corpora — the same +"two implementations, one answer" discipline as the RO path. + +**Corpus:** the real **top-300 PyPI** Type-3 snapshot — **287,408 functions**, 1.53M distinct canonical +lines, mean 11.4 lines/function. Brute force is `287k²/2 ≈ 4·10¹⁰` pairs (hours); the join runs in +seconds. M3 Pro (6 P + 6 E cores), 12 threads. + +### 6a. Full join — three backends + +`cosine_join` across its three `Concurrency` backends. `"cpu"` and `"gpu+cpu"` are **byte-for-byte +identical**; `"gpu"` (pure f32) differs by ≤ 1 pair in millions — cosine is a sum of non-negative +products, so there's no cancellation, f32 is ~1e-6 accurate, and only threshold-boundary pairs can flip: + +| threshold | pairs found | `cpu` (exact f64) | `gpu+cpu` (exact f64) | `gpu` (f32) | +|---|---|---|---|---| +| 0.8 | 3,115,369 | 2.9–3.5 s | **1.6–2.0 s · ~1.8×** | 1.6 s · 1.9× | +| 0.7 | 4,427,097 | 4.6–5.2 s | 2.4 s · 1.9× | **2.3 s · 2.0×** | + +(`gpu` differed from exact by **1 pair of 3,115,369** at t=0.8, **0 of 4,427,097** at t=0.7; max cosine +gap on shared pairs 8.8e-7.) + +### 6b. Why the GPU wins — the verify is bandwidth-bound + +The join's dominant cost is the **verify**: ~10⁸ candidate pairs, each an `O(nnz)` sparse dot that +gathers two random CSR rows. With every core gathering at once it's memory-**bandwidth**-bound (not +compute), and the Apple GPU sustains far more in-flight memory requests against the same unified-memory +pool. Measured on 20M random sparse-dot pairs (173 B/pair, f32): + +| backend | throughput | effective gather bandwidth | % of ~150 GB/s peak | +|---|---|---|---| +| GPU (Metal, incl. pair upload) | 309 M pairs/s | **53 GB/s** | ~36% | +| CPU rayon (12 threads) | 126 M pairs/s | 22 GB/s | ~14% | +| CPU serial | 17 M pairs/s | 2.9 GB/s | ~2% | + +GPU f32 matches CPU f32 to **6e-8** (the kernel is correct). On a scatter pattern, 36% of peak is the +GPU's memory-level-parallelism edge — the CPU stalls on each random gather where the GPU keeps hundreds +in flight. + +**Honest read.** The hardware (Metal) is **f32-only** — no `double` — so a GPU dot can't be +bit-identical to the CPU `f64`. `gpu+cpu` works around it: the GPU f32 dot is a *conservative filter* +(it rejects only what's clearly below `t`), and the CPU recomputes the exact `f64` score on the ~3% of +survivors that pass — so the pair set + scores are byte-identical to `cpu`. The `gpu` backend skips the +re-verify and emits f32 (≤1 differing pair per millions, immaterial at a similarity threshold). The CPU +backend itself is already L2AP-tuned to the bandwidth wall — branchless sorted-merge dot, cache-packed +prune state, full-index-then-parallel-probe — so the GPU's ~2× sits on top of an already-fast baseline +(itself ~23× over a naive inverted-index join on synthetic Zipfian data). Methodology + Metal kernel: +[`examples/simjoin_gpu_bench.rs`](examples/simjoin_gpu_bench.rs), `src/simjoin_gpu.rs`. diff --git a/examples/simjoin_bench.rs b/examples/simjoin_bench.rs new file mode 100644 index 0000000..3b65266 --- /dev/null +++ b/examples/simjoin_bench.rs @@ -0,0 +1,115 @@ +//! Bench harness for `simjoin::cosine_join` on a synthetic Zipfian-skewed sparse corpus (few common +//! dims, many rare — like IDF-weighted lines/tokens). Reproducible, no I/O, scalable. +//! +//! `cargo run --release --example simjoin_bench -- [n] [nnz] [ndims] [threshold] [reps]` +//! defaults: n=100000 nnz=14 ndims=20000 t=0.7 reps=3 + +#![allow(clippy::cast_possible_truncation, clippy::cast_precision_loss, clippy::cast_sign_loss)] + +use std::time::Instant; + +use difflib_fast::simjoin::{cosine_join, Corpus}; +#[cfg(feature = "profiling")] +use difflib_fast::simjoin::cosine_join_counts; + +fn arg(i: usize, def: T) -> T { + std::env::args().nth(i).and_then(|s| s.parse().ok()).unwrap_or(def) +} + +/// Deterministic xorshift → IDF-weighted sparse rows. Each vector draws `nnz` distinct dims with a +/// cubic bias toward low ids (so low ids are common, high ids rare); the per-dim weight is its IDF +/// `ln(n / df)` computed from the generated corpus — the realistic weighted-cosine input shape. +fn gen(n: usize, nnz: usize, ndims: usize, seed: u64) -> Vec> { + let mut s = seed; + let mut next = move || { + s ^= s << 13; + s ^= s >> 7; + s ^= s << 17; + s + }; + // 1. distinct dims per vector (cubic skew → Zipfian-ish frequencies). ~5% of vectors are planted + // near-duplicates of an earlier one (so real cosine clusters exist → the verify path runs). + let mut sets: Vec> = Vec::with_capacity(n); + for i in 0..n { + let dup = i > 0 && (next() % 100) < 5; + let mut v: Vec = if dup { + let src = (next() as usize) % i; + let mut c = sets[src].clone(); + if !c.is_empty() { + let k = (next() as usize) % c.len(); + c[k] = (next() % ndims as u64) as u32; // mutate one dim + } + c + } else { + (0..nnz) + .map(|_| { + let u = (next() >> 11) as f64 / (1u64 << 53) as f64; // [0,1) + ((u * u * u) * ndims as f64) as u32 % ndims as u32 + }) + .collect() + }; + v.sort_unstable(); + v.dedup(); + sets.push(v); + } + let mut df = vec![0u32; ndims]; + for v in &sets { + for &d in v { + df[d as usize] += 1; + } + } + // 2. weight each present dim by its IDF. + sets.into_iter() + .map(|v| { + v.into_iter() + .map(|d| { + let idf = (n as f64 / f64::from(df[d as usize]).max(1.0)).ln(); + (d, idf) + }) + .collect() + }) + .collect() +} + +fn main() { + let n: usize = arg(1, 100_000); + let nnz: usize = arg(2, 14); + let ndims: usize = arg(3, 20_000); + let t: f64 = arg(4, 0.7); + let reps: usize = arg(5, 3); + + let rows = gen(n, nnz, ndims, 0x1234_5678_9abc_def1); + let build0 = Instant::now(); + let corpus = Corpus::from_rows(&rows); + let build_ms = build0.elapsed().as_secs_f64() * 1000.0; + + // Strategy diagnostic (profiling builds only): posting touches / candidates / pairs. The + // candidates-per-pair ratio decides whether to prune harder or speed the dot up. + #[cfg(feature = "profiling")] + if std::env::var("STATS").is_ok() { + let (ncand, survivors, pairs) = cosine_join_counts(&corpus, t); + eprintln!( + "STATS n={n} t={t} | candidates={ncand} survivors(cos_full)={survivors} pairs={pairs} \ + | prune_pass={:.4} cos_full_saved={:.4} survivor_precision={:.3}", + survivors as f64 / ncand.max(1) as f64, + 1.0 - survivors as f64 / ncand.max(1) as f64, + pairs as f64 / survivors.max(1) as f64, + ); + } + + let mut ms: Vec = Vec::with_capacity(reps); + let mut npairs = 0usize; + for _ in 0..reps { + let t0 = Instant::now(); + let pairs = cosine_join(&corpus, t); + ms.push(t0.elapsed().as_secs_f64() * 1000.0); + npairs = pairs.len(); + std::hint::black_box(&pairs); + } + ms.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + eprintln!( + "n={n} nnz={nnz} ndims={ndims} t={t} | build={build_ms:.0}ms | join: min={:.1}ms median={:.1}ms | pairs={npairs}", + ms[0], + ms[reps / 2], + ); +} diff --git a/examples/simjoin_gpu_bench.rs b/examples/simjoin_gpu_bench.rs new file mode 100644 index 0000000..0455220 --- /dev/null +++ b/examples/simjoin_gpu_bench.rs @@ -0,0 +1,236 @@ +//! GPU-vs-CPU throughput experiment for `simjoin`'s bandwidth-bound verify step. +//! +//! Loads the binary corpus, builds the CSR, generates a batch of `P` random candidate pairs, and +//! times the f32 sparse-cosine dot over that batch on (a) the GPU (`batch_cosine` kernel, incl. pair +//! upload), (b) the CPU serially, (c) the CPU with rayon. Reports pairs/sec each — answering whether +//! the Apple GPU clears the random-gather dots faster than the tuned CPU. +//! +//! `cargo run --release --features gpu --example simjoin_gpu_bench -- [npairs] [reps]` + +#![allow( + clippy::cast_possible_truncation, + clippy::cast_precision_loss, + clippy::many_single_char_names, + clippy::comparison_chain, + clippy::too_many_lines +)] + +use std::time::Instant; + +use std::collections::HashSet; + +use difflib_fast::simjoin::{cosine_join, cosine_join_gpu, cosine_join_gpu_f32, Corpus}; +use difflib_fast::simjoin_gpu::BatchCosineGpu; +use rayon::prelude::*; + +fn le_u32(b: &[u8], p: usize) -> u32 { + u32::from_le_bytes(b[p..p + 4].try_into().unwrap()) +} + +fn arg(i: usize, def: T) -> T { + std::env::args().nth(i).and_then(|s| s.parse().ok()).unwrap_or(def) +} + +fn load(path: &str) -> Vec> { + let b = std::fs::read(path).expect("read corpus"); + let n = le_u32(&b, 8) as usize; + let mut p = 16usize; + let mut rows = Vec::with_capacity(n); + for _ in 0..n { + let nnz = le_u32(&b, p) as usize; + p += 4; + let mut row = Vec::with_capacity(nnz); + for _ in 0..nnz { + let d = le_u32(&b, p); + let w = f64::from(f32::from_le_bytes(b[p + 4..p + 8].try_into().unwrap())); + p += 8; + row.push((d, w)); + } + rows.push(row); + } + rows +} + +#[inline] +fn cpu_dot(indptr: &[u32], dims: &[u32], wts: &[f32], a: u32, b: u32) -> f32 { + let (mut ia, ea) = (indptr[a as usize] as usize, indptr[a as usize + 1] as usize); + let (mut ib, eb) = (indptr[b as usize] as usize, indptr[b as usize + 1] as usize); + let mut s = 0.0f32; + while ia < ea && ib < eb { + let (da, db) = (dims[ia], dims[ib]); + if da == db { + s += wts[ia] * wts[ib]; + ia += 1; + ib += 1; + } else if da < db { + ia += 1; + } else { + ib += 1; + } + } + s +} + +fn main() { + let path: String = arg(1, "perf-local/pypi-type3.simjoin.bin".to_string()); + let np: usize = arg(2, 20_000_000); + let reps: usize = arg(3, 3); + + let rows = load(&path); + let corpus = Corpus::from_rows(&rows); + let (indptr, dims, wts) = corpus.csr_f32(); + let n = corpus.len(); + + // Deterministic random valid pairs (each gathers two random CSR rows — the bandwidth pattern). + let mut s = 0x1234_5678_9abc_def1u64; + let mut next = move || { + s ^= s << 13; + s ^= s >> 7; + s ^= s << 17; + s + }; + let mut pa = Vec::with_capacity(np); + let mut pb = Vec::with_capacity(np); + for _ in 0..np { + pa.push((next() as usize % n) as u32); + pb.push((next() as usize % n) as u32); + } + + let Some(gpu) = BatchCosineGpu::new(&indptr, &dims, &wts) else { + eprintln!("no Metal device — skipping"); + return; + }; + eprintln!("device: {} | n={n} nnz={} | npairs={np}", gpu.device_name(), dims.len()); + + let med = |mut v: Vec| { + v.sort_by(|a, b| a.partial_cmp(b).unwrap()); + v[v.len() / 2] + }; + let rate = |ms: f64| np as f64 / (ms / 1000.0) / 1e6; // M pairs/sec + // Effective gather bandwidth: bytes of row data actually read = Σ (nnz_a+nnz_b) × 8 B per pair + // (u32 dim + f32 weight). This is the dominant traffic on the random-gather verify. + let row_bytes: u64 = (0..np) + .map(|k| { + let la = u64::from(indptr[pa[k] as usize + 1] - indptr[pa[k] as usize]); + let lb = u64::from(indptr[pb[k] as usize + 1] - indptr[pb[k] as usize]); + (la + lb) * 8 + }) + .sum(); + let gbs = |ms: f64| row_bytes as f64 / (ms / 1000.0) / 1e9; // GB/s of gathered row data + + // GPU (includes pair upload — a real hybrid must materialise pairs too). + let mut g = Vec::new(); + let mut gpu_out = Vec::new(); + for _ in 0..reps { + let t0 = Instant::now(); + gpu_out = gpu.cosine_batch(&pa, &pb); + g.push(t0.elapsed().as_secs_f64() * 1000.0); + } + let gpu_ms = med(g); + + // CPU serial. + let mut cs = Vec::new(); + for _ in 0..reps { + let t0 = Instant::now(); + let mut acc = 0.0f32; + for k in 0..np { + acc += cpu_dot(&indptr, &dims, &wts, pa[k], pb[k]); + } + cs.push(t0.elapsed().as_secs_f64() * 1000.0); + std::hint::black_box(acc); + } + let cpu_serial_ms = med(cs); + + // CPU rayon. + let mut cp = Vec::new(); + for _ in 0..reps { + let t0 = Instant::now(); + let acc: f32 = (0..np) + .into_par_iter() + .map(|k| cpu_dot(&indptr, &dims, &wts, pa[k], pb[k])) + .sum(); + cp.push(t0.elapsed().as_secs_f64() * 1000.0); + std::hint::black_box(acc); + } + let cpu_par_ms = med(cp); + + // Sanity: GPU f32 vs CPU f32 should agree to ~1e-4 (same precision/algorithm). + let mut maxdiff = 0.0f32; + for k in 0..np.min(100_000) { + let c = cpu_dot(&indptr, &dims, &wts, pa[k], pb[k]); + maxdiff = maxdiff.max((gpu_out[k] - c).abs()); + } + + eprintln!( + "row_bytes={:.0}MB over {np} pairs (mean {:.0}B/pair)", + row_bytes as f64 / 1e6, + row_bytes as f64 / np as f64, + ); + eprintln!( + "GPU(+upload): {gpu_ms:.1}ms = {:.0} Mpairs/s = {:.1} GB/s | \ + CPU serial: {cpu_serial_ms:.1}ms = {:.0} M/s = {:.1} GB/s | \ + CPU rayon: {cpu_par_ms:.1}ms = {:.0} M/s = {:.1} GB/s | GPU/CPUrayon {:.2}x | maxdiff={maxdiff:.1e}", + rate(gpu_ms), + gbs(gpu_ms), + rate(cpu_serial_ms), + gbs(cpu_serial_ms), + rate(cpu_par_ms), + gbs(cpu_par_ms), + cpu_par_ms / gpu_ms, + ); + + // --- Full hybrid join vs CPU join, at the threshold given in env SJ_T (default 0.8) --- + let jt: f64 = std::env::var("SJ_T").ok().and_then(|s| s.parse().ok()).unwrap_or(0.8); + + let c0 = Instant::now(); + let cpu_pairs = cosine_join(&corpus, jt); + let cpu_join_ms = c0.elapsed().as_secs_f64() * 1000.0; + + let h0 = Instant::now(); + let gpu_pairs = cosine_join_gpu(&corpus, jt, &gpu); + let hyb_join_ms = h0.elapsed().as_secs_f64() * 1000.0; + + // Parity: the hybrid MUST return the exact same pair set + scores as the pure-CPU join. + let mut a = cpu_pairs; + let mut b = gpu_pairs; + a.sort_by_key(|x| (x.0, x.1)); + b.sort_by_key(|x| (x.0, x.1)); + let same = a.len() == b.len() + && a.iter().zip(&b).all(|(x, y)| x.0 == y.0 && x.1 == y.1 && x.2.to_bits() == y.2.to_bits()); + + eprintln!( + "JOIN t={jt} pairs={} | CPU: {cpu_join_ms:.0}ms | hybrid GPU+CPU: {hyb_join_ms:.0}ms \ + | speedup {:.2}x | parity={}", + a.len(), + cpu_join_ms / hyb_join_ms, + if same { "BIT-IDENTICAL ✓" } else { "MISMATCH ✗" }, + ); + + // --- Pure-f32 join: is "f32 everywhere" actually worse? Measure pair-set delta + speed. --- + let f0 = Instant::now(); + let f32_pairs = cosine_join_gpu_f32(&corpus, jt, &gpu); + let f32_join_ms = f0.elapsed().as_secs_f64() * 1000.0; + + // a = the exact f64 pair set (sorted). Compare membership + worst score gap on shared pairs. + let f64_set: HashSet<(usize, usize)> = a.iter().map(|p| (p.0, p.1)).collect(); + let f32_set: HashSet<(usize, usize)> = f32_pairs.iter().map(|p| (p.0, p.1)).collect(); + let only_f64 = f64_set.difference(&f32_set).count(); + let only_f32 = f32_set.difference(&f64_set).count(); + let f64_score: std::collections::HashMap<(usize, usize), f64> = + a.iter().map(|p| ((p.0, p.1), p.2)).collect(); + let max_gap = f32_pairs + .iter() + .filter_map(|p| f64_score.get(&(p.0, p.1)).map(|&e| (f64::from(p.2) - e).abs())) + .fold(0.0f64, f64::max); + let diff = only_f64 + only_f32; + eprintln!( + "F32-ONLY t={jt} pairs={} | {f32_join_ms:.0}ms (vs CPU {cpu_join_ms:.0}ms = {:.2}x, \ + vs hybrid {hyb_join_ms:.0}ms = {:.2}x) | differing pairs: {diff} of {} ({:.4}%: \ + {only_f64} dropped, {only_f32} added) | max score gap on shared: {max_gap:.1e}", + f32_pairs.len(), + cpu_join_ms / f32_join_ms, + hyb_join_ms / f32_join_ms, + a.len(), + 100.0 * diff as f64 / a.len() as f64, + ); +} diff --git a/examples/simjoin_pypi.rs b/examples/simjoin_pypi.rs new file mode 100644 index 0000000..79b37aa --- /dev/null +++ b/examples/simjoin_pypi.rs @@ -0,0 +1,94 @@ +//! Real-data `simjoin` bench: load a binary Type-3 corpus (built by `scripts/simjoin-corpus.py` +//! from the top-300 PyPI snapshot) and time `cosine_join` on it. +//! +//! `cargo run --release --example simjoin_pypi -- [threshold] [reps]` +//! defaults: threshold=0.8 reps=3. Thread count via `RAYON_NUM_THREADS`. + +#![allow( + clippy::cast_possible_truncation, + clippy::cast_precision_loss, + clippy::many_single_char_names, + clippy::doc_markdown +)] + +use std::time::Instant; + +use difflib_fast::simjoin::{cosine_join, Corpus}; +#[cfg(feature = "profiling")] +use difflib_fast::simjoin::cosine_join_counts; + +fn le_u32(b: &[u8], p: usize) -> u32 { + u32::from_le_bytes(b[p..p + 4].try_into().unwrap()) +} + +fn arg(i: usize, def: T) -> T { + std::env::args().nth(i).and_then(|s| s.parse().ok()).unwrap_or(def) +} + +/// Parse the `SIMJOIN1` binary into raw `(dim, weight)` rows (weights kept as `f64`; `Corpus` +/// L2-normalises and merges duplicate dims). +fn load(path: &str) -> Vec> { + let b = std::fs::read(path).expect("read corpus"); + assert!(b.len() >= 16, "corpus too small"); + let magic = u64::from_le_bytes(b[0..8].try_into().unwrap()); + assert_eq!(magic, 0x5349_4D4A_4F49_4E31, "bad magic"); + let n = le_u32(&b, 8) as usize; + let mut p = 16usize; + let mut rows = Vec::with_capacity(n); + for _ in 0..n { + let nnz = le_u32(&b, p) as usize; + p += 4; + let mut row = Vec::with_capacity(nnz); + for _ in 0..nnz { + let d = le_u32(&b, p); + let w = f64::from(f32::from_le_bytes(b[p + 4..p + 8].try_into().unwrap())); + p += 8; + row.push((d, w)); + } + rows.push(row); + } + rows +} + +fn main() { + let path: String = arg(1, "perf-local/pypi-type3.simjoin.bin".to_string()); + let t: f64 = arg(2, 0.8); + let reps: usize = arg(3, 3); + + let rows = load(&path); + let n = rows.len(); + let nnz: usize = rows.iter().map(Vec::len).sum(); + + let b0 = Instant::now(); + let corpus = Corpus::from_rows(&rows); + let build_ms = b0.elapsed().as_secs_f64() * 1000.0; + + #[cfg(feature = "profiling")] + if std::env::var("STATS").is_ok() { + let (ncand, survivors, pairs) = cosine_join_counts(&corpus, t); + eprintln!( + "STATS n={n} t={t} | candidates={ncand} survivors(cos_full)={survivors} pairs={pairs} \ + | prune_pass={:.4} survivor_precision={:.3}", + survivors as f64 / ncand.max(1) as f64, + pairs as f64 / survivors.max(1) as f64, + ); + } + + let mut ms: Vec = Vec::with_capacity(reps); + let mut npairs = 0usize; + for _ in 0..reps { + let t0 = Instant::now(); + let pairs = cosine_join(&corpus, t); + ms.push(t0.elapsed().as_secs_f64() * 1000.0); + npairs = pairs.len(); + std::hint::black_box(&pairs); + } + ms.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + eprintln!( + "pypi-type3 n={n} nnz_total={nnz} mean_nnz={:.1} t={t} | build={build_ms:.0}ms | \ + join: min={:.1}ms median={:.1}ms | pairs={npairs}", + nnz as f64 / n as f64, + ms[0], + ms[reps / 2], + ); +} diff --git a/python/difflib_fast/__init__.py b/python/difflib_fast/__init__.py index 99d9fab..6752f55 100644 --- a/python/difflib_fast/__init__.py +++ b/python/difflib_fast/__init__.py @@ -13,14 +13,23 @@ from typing import overload from ._difflib_fast import ( + CosineJoiner, Rationer, cluster_canonicals, cluster_canonicals_lsh, + cosine_join, ratio as _ratio, ratio_many as _ratio_many, ) -__all__ = ["ratio", "cluster_canonicals", "cluster_canonicals_lsh", "Rationer"] +__all__ = [ + "ratio", + "cluster_canonicals", + "cluster_canonicals_lsh", + "Rationer", + "cosine_join", + "CosineJoiner", +] @overload diff --git a/python/difflib_fast/__init__.pyi b/python/difflib_fast/__init__.pyi index cc8d373..1e6d122 100644 --- a/python/difflib_fast/__init__.pyi +++ b/python/difflib_fast/__init__.pyi @@ -44,6 +44,49 @@ def cluster_canonicals_lsh( ``threads=N`` caps the pool to N workers for this call (``threads=0``, default, uses every core). """ +def cosine_join( + docs: list[list[str]], threshold: float, concurrency: str = "cpu", threads: int = 0 +) -> list[tuple[int, int, float]]: + """Exact all-pairs weighted-cosine similarity join over token documents. + + Each ``doc`` is a list of string tokens (e.g. a function's canonicalised lines); they're turned + into TF-IDF sparse vectors in Rust (dim = distinct token, weight = token-count × ``ln(n/df)``) and + every pair with cosine ``>= threshold`` is returned as ``(j, i, cos)`` with ``j < i``. Like + :func:`ratio`'s batch form, the work fans out across **all cores inside Rust with the GIL + released** — one call, full multicore, no ``ThreadPoolExecutor``. ``threads=N`` caps the pool to N + for this call; ``threads=0`` (default) uses every core (tunable via ``RAYON_NUM_THREADS``). + + ``concurrency`` ∈ ``"cpu" | "gpu" | "gpu+cpu"``: ``"cpu"`` is exact f64; ``"gpu+cpu"`` is the exact + f64 GPU-accelerated hybrid (byte-identical to ``"cpu"``); ``"gpu"`` runs the dot on the Metal GPU + in f32 (fastest; differs from exact only on pairs within ~1e-6 of the threshold). Off a macOS + ``gpu`` wheel, or with no Metal device, the GPU modes quietly fall back to CPU. For repeated joins + on one corpus use :class:`CosineJoiner` (builds the corpus / uploads to the GPU once). + """ + +class CosineJoiner: + """Stateful similarity-join handle: builds the TF-IDF corpus and (on a macOS ``gpu`` wheel) + acquires the Metal device + uploads the corpus once, then answers repeated joins reusing them. + + Use it to sweep thresholds — the free :func:`cosine_join` rebuilds everything per call. + """ + + def __init__(self, docs: list[list[str]]) -> None: + """Build a joiner over token documents (each a list of string tokens).""" + + def __len__(self) -> int: + """Number of documents in the corpus.""" + + @property + def has_gpu(self) -> bool: + """Whether a Metal GPU backend was acquired (always ``False`` off a macOS ``gpu`` wheel).""" + + def join( + self, threshold: float, concurrency: str = "cpu", threads: int = 0 + ) -> list[tuple[int, int, float]]: + """Join at ``threshold`` under ``concurrency`` (``"cpu" | "gpu" | "gpu+cpu"``), reusing the + handle's resources. Returns ``(j, i, cos)`` pairs with ``j < i``; fans out across all cores + with the GIL released (``threads=0`` = every core).""" + class Rationer: """Stateful similarity/clustering handle that owns the backend resources once and reuses them. diff --git a/src/lib.rs b/src/lib.rs index bfdd079..47232cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,12 +28,20 @@ use rayon::prelude::*; pub mod gestalt; pub use gestalt::gestalt_ratio; +// Exact all-pairs weighted-cosine similarity join (AllPairs/L2AP) — inverted index + prefix +// filtering. The principled replacement for shingle-candidate near-duplicate detection. +pub mod simjoin; + // Heterogeneous CPU+GPU exact RO — Metal compute backend (Apple Silicon). Behind `feature = "gpu"` // + `cfg(target_os = "macos")`; the rest of the crate falls back to the CPU path when it's off or // no Metal device is available at runtime. #[cfg(all(feature = "gpu", target_os = "macos"))] pub mod gpu; +// GPU batched sparse-cosine — the offload experiment for `simjoin`'s bandwidth-bound verify step. +#[cfg(all(feature = "gpu", target_os = "macos"))] +pub mod simjoin_gpu; + // `Rationer` — the stateful, GPU-accelerated similarity/clustering handle. Always available; // degrades to CPU when the `gpu` feature is off or no Metal device can be acquired. pub mod rationer; @@ -706,6 +714,19 @@ mod python { } } + /// Parse a `"cpu" | "gpu" | "gpu+cpu"` backend string into a [`Concurrency`](super::Concurrency). + fn parse_concurrency(s: &str) -> PyResult { + use pyo3::exceptions::PyValueError; + match s.to_ascii_lowercase().as_str() { + "cpu" => Ok(super::Concurrency::Cpu), + "gpu" => Ok(super::Concurrency::Gpu), + "gpu+cpu" | "gpucpu" | "gpu_cpu" => Ok(super::Concurrency::GpuPlusCpu), + other => Err(PyValueError::new_err(format!( + "unknown concurrency {other:?}; expected \"cpu\", \"gpu\", or \"gpu+cpu\"" + ))), + } + } + /// `ratio(a, b)` — fast exact `difflib.SequenceMatcher(None, a, b, autojunk=False).ratio()`. /// /// Releases the GIL for the compute (the inputs are copied to owned `String`s first), so calling @@ -769,17 +790,7 @@ mod python { #[new] #[pyo3(signature = (concurrency="gpu+cpu", threads=0, delta=0.0))] fn new(concurrency: &str, threads: usize, delta: f64) -> PyResult { - use pyo3::exceptions::PyValueError; - let c = match concurrency.to_ascii_lowercase().as_str() { - "cpu" => super::Concurrency::Cpu, - "gpu" => super::Concurrency::Gpu, - "gpu+cpu" | "gpucpu" | "gpu_cpu" => super::Concurrency::GpuPlusCpu, - other => { - return Err(PyValueError::new_err(format!( - "unknown concurrency {other:?}; expected \"cpu\", \"gpu\", or \"gpu+cpu\"" - ))) - } - }; + let c = parse_concurrency(concurrency)?; let mut b = super::Rationer::builder().concurrency(c).delta(delta); if threads > 0 { b = b.threads(threads); @@ -825,6 +836,82 @@ mod python { } } + /// `cosine_join(docs, threshold, concurrency="cpu", threads=0)` — exact all-pairs weighted-cosine + /// similarity join over **token documents**. Each `doc` is a list of string tokens (e.g. a + /// function's canonical lines); they're turned into TF-IDF sparse vectors in Rust and every pair + /// with cosine `>= threshold` is returned as `(j, i, cos)` with `j < i`. + /// + /// `concurrency` ∈ `"cpu" | "gpu" | "gpu+cpu"`: `"cpu"` is exact f64 everywhere; `"gpu+cpu"` is the + /// exact f64 GPU-accelerated hybrid (byte-identical to `"cpu"`); `"gpu"` runs the dot on the GPU in + /// f32 (fastest; differs from exact only on pairs within ~1e-6 of the threshold). On a wheel built + /// without the `gpu` feature, or with no Metal device, the GPU modes quietly fall back to CPU. The + /// GIL is released for the whole build+join. For repeated joins on one corpus, use `CosineJoiner`. + #[pyfunction] + #[pyo3(signature = (docs, threshold, concurrency="cpu", threads=0))] + #[allow(clippy::needless_pass_by_value)] + fn cosine_join( + py: Python<'_>, + docs: Vec>, + threshold: f64, + concurrency: &str, + threads: usize, + ) -> PyResult> { + let mode = parse_concurrency(concurrency)?; + Ok(py.detach(|| { + run_on_threads(threads, || { + let corpus = super::simjoin::Corpus::from_token_docs(&docs); + super::simjoin::cosine_join_with(&corpus, threshold, mode) + }) + })) + } + + /// `CosineJoiner(docs)` — stateful similarity-join handle that builds the TF-IDF corpus and (on a + /// macOS `gpu` wheel) acquires the Metal device + uploads the corpus **once**, then answers + /// repeated `join(threshold, concurrency=...)` calls reusing them. The free `cosine_join` rebuilds + /// everything per call; a `CosineJoiner` pays it once — use it to sweep thresholds. + #[pyclass(name = "CosineJoiner")] + struct PyCosineJoiner { + inner: super::simjoin::CosineJoiner, + } + + #[pymethods] + impl PyCosineJoiner { + #[new] + #[allow(clippy::needless_pass_by_value)] + fn new(py: Python<'_>, docs: Vec>) -> Self { + let inner = py.detach(|| { + super::simjoin::CosineJoiner::new(super::simjoin::Corpus::from_token_docs(&docs)) + }); + Self { inner } + } + + /// Number of documents in the corpus. + fn __len__(&self) -> usize { + self.inner.corpus().len() + } + + /// Whether a Metal GPU backend was acquired (always `False` off a macOS `gpu` wheel). When + /// `False`, every `join` runs on CPU regardless of the `concurrency` argument. + #[getter] + fn has_gpu(&self) -> bool { + self.inner.has_gpu() + } + + /// Join at `threshold` under `concurrency` (`"cpu" | "gpu" | "gpu+cpu"`), reusing the handle's + /// resources. Returns `(j, i, cos)` pairs with `j < i`. GIL released for the compute. + #[pyo3(signature = (threshold, concurrency="cpu", threads=0))] + fn join( + &self, + py: Python<'_>, + threshold: f64, + concurrency: &str, + threads: usize, + ) -> PyResult> { + let mode = parse_concurrency(concurrency)?; + Ok(py.detach(|| run_on_threads(threads, || self.inner.join(threshold, mode)))) + } + } + /// Compiled core of the `difflib_fast` Python package (re-exported by `difflib_fast/__init__.py`). #[pymodule] fn _difflib_fast(m: &Bound<'_, PyModule>) -> PyResult<()> { @@ -832,7 +919,9 @@ mod python { m.add_function(wrap_pyfunction!(ratio_many, m)?)?; m.add_function(wrap_pyfunction!(cluster_canonicals, m)?)?; m.add_function(wrap_pyfunction!(cluster_canonicals_lsh, m)?)?; + m.add_function(wrap_pyfunction!(cosine_join, m)?)?; m.add_class::()?; + m.add_class::()?; Ok(()) } } diff --git a/src/simjoin.rs b/src/simjoin.rs new file mode 100644 index 0000000..13057fa --- /dev/null +++ b/src/simjoin.rs @@ -0,0 +1,816 @@ +//! `simjoin` — exact all-pairs **weighted-cosine similarity join**: given a corpus of sparse +//! non-negative vectors, find every pair `(i, j)` whose cosine similarity is `≥ t`. +//! +//! This is the `AllPairs` / `L2AP` family (Bayardo et al. WWW'07; Anastasiu & Karypis ICDE'14): an +//! inverted index with **prefix filtering** so that the vast majority of vector pairs are never +//! compared. It is the principled, exact replacement for shingle-candidate + verify-all +//! near-duplicate detection (e.g. Type-3 code clones = functions × IDF-weighted lines). +//! +//! ## Correctness gate +//! [`cosine_join`] (the indexed algorithm) must return the **exact same** pair set, with +//! bit-identical similarities, as [`cosine_join_bruteforce`] (the naive `O(n²)` oracle). Both score +//! a pair with the same [`cos_full`] sorted-merge dot, so the values match to the bit and a pair is +//! never dropped or gained at the threshold. This equality is asserted on fuzzed corpora — the same +//! "two implementations, one answer" discipline the RO path uses. +//! +//! ## Method (this reference) +//! Vectors are L2-normalised (so `cos = dot`) and their dimensions relabelled to a global rank by +//! **increasing max weight** (common low-weight dims first, rare high-weight dims last — so only the +//! rare tail is indexed, keeping postings short). For a probe vector we look up candidates through +//! the index, then verify with the full dot. When *indexing* a vector we skip the leading prefix +//! whose max possible contribution to any dot stays `< t`: +//! `Σ_{k, + /// Relabelled (rank) dimension ids, ascending within each row. + dims: Vec, + /// L2-normalised weights, aligned with `dims`. + wts: Vec, + /// Per relabelled dim: the max weight it takes across the corpus (the prefix-filter bound). + maxw: Vec, +} + +impl Corpus { + /// Number of vectors. + #[must_use] + pub fn len(&self) -> usize { + self.n + } + + /// True if the corpus has no vectors. + #[must_use] + pub fn is_empty(&self) -> bool { + self.n == 0 + } + + /// `(dims, weights)` of vector `i` — dims ascending by global rank, weights L2-normalised. + #[must_use] + fn row(&self, i: usize) -> (&[u32], &[f64]) { + let (s, e) = (self.indptr[i], self.indptr[i + 1]); + (&self.dims[s..e], &self.wts[s..e]) + } + + /// CSR view for GPU offload: `(indptr, dims, wts_f32)`, with `indptr` cast to `u32` and the + /// L2-normalised weights cast to `f32` (Apple GPUs have no `f64`). For the + /// [`crate::simjoin_gpu`] throughput experiment only — the f32 cast means GPU dots are *not* + /// bit-identical to the CPU `f64` path. + #[cfg(all(target_os = "macos", feature = "gpu"))] + #[must_use] + pub fn csr_f32(&self) -> (Vec, Vec, Vec) { + let indptr = self.indptr.iter().map(|&x| x as u32).collect(); + let wts = self.wts.iter().map(|&w| w as f32).collect(); + (indptr, self.dims.clone(), wts) + } + + /// Build a corpus from **token documents** — each document a list of string tokens — as TF-IDF + /// sparse vectors: dim = a distinct token, weight = `(token count in doc) × ln(n / df_token)` + /// (`df` = number of documents containing the token). This is the principled input for a Type-3 + /// code-clone join (documents = functions, tokens = canonicalised lines) and the shape most + /// callers actually have. A token appearing in every document gets `idf = 0` (contributes + /// nothing), as expected. + #[must_use] + pub fn from_token_docs>(docs: &[Vec]) -> Corpus { + let n = docs.len(); + let mut dim: HashMap<&str, u32> = HashMap::new(); + let mut df: Vec = Vec::new(); + // Assign a dim id to each distinct token and count document frequency (once per doc/token). + let mut doc_ids: Vec> = Vec::with_capacity(n); + for doc in docs { + let mut ids = Vec::with_capacity(doc.len()); + let mut seen: HashSet = HashSet::new(); + for tok in doc { + let id = *dim.entry(tok.as_ref()).or_insert_with(|| { + let i = df.len() as u32; + df.push(0); + i + }); + ids.push(id); + if seen.insert(id) { + df[id as usize] += 1; + } + } + doc_ids.push(ids); + } + let idf: Vec = df.iter().map(|&d| (n as f64 / f64::from(d.max(1))).ln()).collect(); + // Emit (dim, idf) once per token occurrence; `from_rows` sums duplicates → tf·idf per dim. + let rows: Vec> = doc_ids + .iter() + .map(|ids| ids.iter().map(|&id| (id, idf[id as usize])).collect()) + .collect(); + Corpus::from_rows(&rows) + } + + /// Build a corpus from raw `(dim, weight)` rows. Duplicate dims within a row are summed; each row + /// is L2-normalised; dims are relabelled to a global rank by decreasing max weight. Weights are + /// expected non-negative (the prefix-filter bound requires it — IDF weights satisfy this). + #[must_use] + pub fn from_rows(rows: &[Vec<(u32, f64)>]) -> Corpus { + let n = rows.len(); + // 1. Merge duplicate dims + L2-normalise each row (kept as (orig_dim, weight)). + let normed: Vec> = rows + .iter() + .map(|r| { + let mut m: HashMap = HashMap::new(); + for &(d, w) in r { + *m.entry(d).or_insert(0.0) += w; + } + let norm = m.values().map(|w| w * w).sum::().sqrt(); + if norm > 0.0 { + m.into_iter().map(|(d, w)| (d, w / norm)).collect() + } else { + Vec::new() + } + }) + .collect(); + // 2. Max normalised weight per original dim. + let mut maxw_orig: HashMap = HashMap::new(); + for v in &normed { + for &(d, w) in v { + let e = maxw_orig.entry(d).or_insert(0.0); + if w > *e { + *e = w; + } + } + } + // 3. Rank dims by (max weight ASC, dim asc) → dense global order. Ascending so the common, + // low-weight dims land at the FRONT: they fill the un-indexed prefix (their tiny + // `w·maxw` keeps the cumulative bound under `t` for many of them), and only the rare, + // high-weight tail is indexed — short postings. (Reversing this indexes the common dims + // and their huge postings, which is correct but orders of magnitude slower.) + let mut dims_sorted: Vec<(u32, f64)> = maxw_orig.into_iter().collect(); + dims_sorted.sort_by(|a, b| { + a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal).then(a.0.cmp(&b.0)) + }); + let rank: HashMap = + dims_sorted.iter().enumerate().map(|(i, &(d, _))| (d, i as u32)).collect(); + let ndims = dims_sorted.len(); + let maxw: Vec = dims_sorted.iter().map(|&(_, w)| w).collect(); + // 4. CSR with relabelled dims, ascending within each row. + let mut indptr = Vec::with_capacity(n + 1); + indptr.push(0); + let mut dims = Vec::new(); + let mut wts = Vec::new(); + for v in &normed { + let mut rv: Vec<(u32, f64)> = v.iter().map(|&(d, w)| (rank[&d], w)).collect(); + rv.sort_unstable_by_key(|&(d, _)| d); + for (d, w) in rv { + dims.push(d); + wts.push(w); + } + indptr.push(dims.len()); + } + Corpus { n, ndims, indptr, dims, wts, maxw } + } +} + +/// Sorted-merge dot product of two rows (dims ascending). For L2-normalised non-negative vectors +/// this is exactly their cosine similarity. The single scoring routine shared by the indexed join +/// and the brute-force oracle, so both agree to the bit. +#[must_use] +#[cfg_attr(feature = "profiling", inline(never))] +fn cos_full((da, wa): (&[u32], &[f64]), (db, wb): (&[u32], &[f64])) -> f64 { + // Rows are equal-length dim/weight slices (built by `Corpus::from_rows`). + debug_assert_eq!(da.len(), wa.len()); + debug_assert_eq!(db.len(), wb.len()); + let (la, lb) = (da.len(), db.len()); + let (mut i, mut j) = (0usize, 0usize); + let mut s = 0.0f64; + // Branchless sorted-merge: the 3-way `cmp` branches mispredict on random dim order, and the + // weight loads carry bounds checks the optimiser can't elide. Here we always load both weights + // (`i= bj); + } + s +} + +/// Per-vector prune data, read in the hot verify loop when a vector appears as a candidate. Packed +/// into one array (not two parallel `Vec`s) so a candidate's `pnorm` and `split` come from a single +/// scattered cache line instead of two — verify is memory-latency-bound on these random accesses. +#[derive(Clone, Copy)] +struct Bound { + /// ‖un-indexed prefix of y‖₂ — Cauchy–Schwarz cap on the dot mass the accumulator misses (the + /// prefix dims of `y` were never indexed, so never accumulated). + pnorm: f64, + /// Rank of `y`'s first *indexed* dim (`u32::MAX` if `y` indexed nothing). Every prefix dim of + /// `y` has rank `<` this — lets the probe restrict its norm to that rank range. + split: u32, +} + +/// Per-vector data cached as each vector is indexed; read by the prune bound when the vector later +/// appears as a candidate. (One `Cached` for the whole join.) +struct Cached { + bound: Vec, +} + +/// Reused scratch buffers (allocated once for the whole join, not per probe). +struct Scratch { + /// `acc[y]` = partial dot of the probe with `y` over shared *indexed* dims; `-1.0` = untouched + /// sentinel (a real partial dot of non-negative weights is always `≥ 0`). + acc: Vec, + /// Candidate ids the current probe touched (the keys to reset in `acc`). + touched: Vec, + /// Probe prefix L2 norms for this probe: `xpn[k] = ‖wi[..k]‖₂`, length `nnz+1`. + xpn: Vec, +} + +/// Exact all-pairs cosine join via inverted index + **L2AP** prefix filtering and accumulation-time +/// pruning. Returns `(j, i, cos)` with `j < i` for every pair with `cos ≥ t`. Bit-identical pair set +/// to [`cosine_join_bruteforce`]. +/// +/// For each probe we accumulate a partial dot over shared *indexed* dims ([`accumulate`]), then for +/// each touched candidate compute a Cauchy–Schwarz upper bound on the true cosine and skip the exact +/// [`cos_full`] when it cannot reach `t` ([`verify_pruned`]). The bound is a filter only — survivors +/// are scored exactly, so the output is byte-for-byte the brute-force result. On skewed data the +/// bound prunes the ~99.9 % of candidates that collide on a single rare dim, so `cos_full` (the +/// former 90 % hotspot) runs only on genuine near-matches. +/// +/// The full inverted index is built once (postings ascending by id), then every vector is probed in +/// **parallel**: probe `i` walks each posting only while `y < i` (postings are id-sorted), so it sees +/// exactly the earlier vectors — each pair `(j, i)` with `j < i` is found once, from the larger id. +/// This is the same candidate set the sequential index-as-you-go build produces, so the result is +/// unchanged; the returned `Vec` is in arbitrary order (sort if a canonical order is needed). +#[must_use] +pub fn cosine_join(c: &Corpus, t: f64) -> Vec<(usize, usize, f64)> { + let n = c.n; + // Postings carry the indexed weight `(y, w_y[d])` so the scan can accumulate a partial dot. + let mut index: Vec> = vec![Vec::new(); c.ndims]; + let mut cached = Cached { bound: vec![Bound { pnorm: 0.0, split: u32::MAX }; n] }; + for i in 0..n { + let (di, wi) = c.row(i); + index_suffix(c, i, (di, wi), t, &mut index, &mut cached); + } + // Probe vectors in parallel; each worker keeps one reusable `Scratch` (an `n`-wide accumulator). + // `with_min_len` batches many probes per rayon task so the per-probe work (tiny) isn't dwarfed by + // task-splitting / scheduling overhead (`swtch_pri` in the profile). + (0..n) + .into_par_iter() + .with_min_len(256) + .map_init( + || Scratch { acc: vec![-1.0; n], touched: Vec::new(), xpn: Vec::new() }, + |scratch, i| { + let (di, wi) = c.row(i); + accumulate(&index, (di, wi), i as u32, scratch); + let mut out = Vec::new(); + verify_pruned(c, i, t, scratch, &cached, &mut out); + out + }, + ) + .flatten() + .collect() +} + +/// Run the cosine join under a chosen [`Concurrency`] backend. Returns `(j, i, cos)` pairs with +/// `j < i` and `cos ≥ t`, scores as `f64` (the `Gpu` mode's f32 cosines are widened losslessly). +/// +/// - [`Concurrency::Cpu`] — [`cosine_join`]: exact `f64`, all-CPU, every platform. +/// - [`Concurrency::GpuPlusCpu`] — exact `f64` hybrid: CPU generates survivor pairs, the GPU f32 +/// cosine *filters* the clear rejects, the CPU recomputes the exact `f64` score on what passes. +/// **Byte-identical to `Cpu`**; both engines fully used. ~1.7–2× on bandwidth-bound real data. +/// - [`Concurrency::Gpu`] — GPU-dominant `f32`: CPU generates survivor pairs, the GPU scores them and +/// the result is emitted directly (no f64 re-verify). Fastest (~2×); differs from the exact answer +/// only on pairs whose true cosine is within ~`1e-6` of `t` (measured: ≤1 pair in millions). +/// +/// When the `gpu` feature is off, the target isn't macOS, or no Metal device can be acquired, the GPU +/// modes transparently fall back to [`cosine_join`] (same as `Rationer`). This convenience entry +/// **compiles + uploads the GPU corpus on every call** — fine for a one-shot join, but for repeated +/// joins on one corpus build a [`CosineJoiner`] once and call [`CosineJoiner::join`], which holds the +/// device + kernel + uploaded CSR across calls (and avoids the driver instability of compiling a +/// Metal library hundreds of times in a tight loop). +#[must_use] +pub fn cosine_join_with(c: &Corpus, t: f64, mode: Concurrency) -> Vec<(usize, usize, f64)> { + #[cfg(all(feature = "gpu", target_os = "macos"))] + { + if matches!(mode, Concurrency::Cpu) { + return cosine_join(c, t); + } + let (indptr, dims, wts) = c.csr_f32(); + let Some(gpu) = crate::simjoin_gpu::BatchCosineGpu::new(&indptr, &dims, &wts) else { + return cosine_join(c, t); // no Metal device → CPU fallback + }; + match mode { + Concurrency::GpuPlusCpu => cosine_join_gpu(c, t, &gpu), + // `Gpu`: emit the GPU f32 cosines directly (widened to f64), no exact re-verify. + Concurrency::Gpu => cosine_join_gpu_f32(c, t, &gpu) + .into_iter() + .map(|(a, b, s)| (a, b, f64::from(s))) + .collect(), + Concurrency::Cpu => unreachable!("handled above"), + } + } + #[cfg(not(all(feature = "gpu", target_os = "macos")))] + { + let _ = mode; // GPU modes degrade to the CPU join when the feature is off / not macOS. + cosine_join(c, t) + } +} + +/// A reusable cosine-join handle that owns the corpus and — under `feature = "gpu"` on macOS — the +/// Metal device, compiled `batch_cosine` kernel, and the corpus CSR uploaded to unified memory, all +/// acquired **once** at construction. Repeated [`join`](CosineJoiner::join)s at different thresholds +/// then skip the per-call kernel compile + CSR upload that [`cosine_join_with`] pays (only the +/// `t`-specific inverted index is rebuilt each call, on the CPU). Always constructible; degrades to +/// the pure-CPU join when the `gpu` feature is off or no Metal device is available — mirroring +/// `Rationer`. This is the right entry point for sweeping thresholds or joining repeatedly. +pub struct CosineJoiner { + corpus: Corpus, + /// Owned GPU resources (device + kernel + CSR in UMA); `None` when no Metal device was acquired. + #[cfg(all(feature = "gpu", target_os = "macos"))] + gpu: Option, +} + +impl CosineJoiner { + /// Build a joiner over `corpus`, acquiring the GPU device + uploading the corpus CSR once if the + /// `gpu` feature is on and a Metal device is present. + #[must_use] + pub fn new(corpus: Corpus) -> Self { + #[cfg(all(feature = "gpu", target_os = "macos"))] + { + let (indptr, dims, wts) = corpus.csr_f32(); + let gpu = crate::simjoin_gpu::BatchCosineGpu::new(&indptr, &dims, &wts); + Self { corpus, gpu } + } + #[cfg(not(all(feature = "gpu", target_os = "macos")))] + { + Self { corpus } + } + } + + /// The owned corpus (e.g. for `len()` or to run other queries against it). + #[must_use] + pub fn corpus(&self) -> &Corpus { + &self.corpus + } + + /// Whether a Metal GPU backend was acquired. Always `false` without `feature = "gpu"` on macOS; + /// when `false`, every [`join`](CosineJoiner::join) runs on the CPU regardless of `mode`. + #[must_use] + pub fn has_gpu(&self) -> bool { + #[cfg(all(feature = "gpu", target_os = "macos"))] + { + self.gpu.is_some() + } + #[cfg(not(all(feature = "gpu", target_os = "macos")))] + { + false + } + } + + /// Run the join at threshold `t` under `mode`, reusing the handle's GPU resources. Returns the + /// same results as [`cosine_join_with`] (Cpu/GpuPlusCpu exact, Gpu f32→f64); falls back to the + /// CPU join when the GPU is unavailable. + #[must_use] + pub fn join(&self, t: f64, mode: Concurrency) -> Vec<(usize, usize, f64)> { + #[cfg(all(feature = "gpu", target_os = "macos"))] + { + match (mode, self.gpu.as_ref()) { + (Concurrency::GpuPlusCpu, Some(g)) => cosine_join_gpu(&self.corpus, t, g), + (Concurrency::Gpu, Some(g)) => cosine_join_gpu_f32(&self.corpus, t, g) + .into_iter() + .map(|(a, b, s)| (a, b, f64::from(s))) + .collect(), + _ => cosine_join(&self.corpus, t), // Cpu mode, or no Metal device + } + } + #[cfg(not(all(feature = "gpu", target_os = "macos")))] + { + let _ = mode; + cosine_join(&self.corpus, t) + } + } +} + +/// FP slack for the prune bound: the Cauchy–Schwarz upper bound holds in exact arithmetic, but the +/// accumulated dot and the `sqrt` norms each carry rounding error. We only *skip* `cos_full` when +/// the bound is below `t` by more than this slack, so a true pair (exact cosine `≥ t`) is never +/// pruned. Not skipping is always correctness-safe (just a wasted `cos_full`), so the slack trades a +/// negligible number of extra verifies for safety and never changes the emitted pair set. +/// `1e-9 ≫` the `~1e-15` accumulated error over ~15 terms. +const PRUNE_SLACK: f64 = 1e-9; + +/// Phase 1 — accumulate: for each indexed dim of the probe, add `w_probe·w_y` into `acc[y]` for +/// every **earlier** `y` (`y < cutoff`, the probe's own id) indexing that dim. Postings are id-sorted, +/// so we `break` at the first `y ≥ cutoff`. Leaves `acc` `-1.0` everywhere except the touched ids +/// (listed in `touched`, reset in [`verify_pruned`]). One scattered `acc[]` FMA per posting. +#[cfg_attr(feature = "profiling", inline(never))] +fn accumulate(index: &[Vec<(u32, f64)>], (di, wi): (&[u32], &[f64]), cutoff: u32, s: &mut Scratch) { + s.touched.clear(); + for (&d, &w) in di.iter().zip(wi) { + for &(y, wy) in &index[d as usize] { + if y >= cutoff { + break; + } + let yu = y as usize; + // SAFETY: `y` is a vector id pushed by `index_suffix`, so `yu < n == acc.len()`. + let a = unsafe { s.acc.get_unchecked_mut(yu) }; + if *a < 0.0 { + *a = 0.0; + s.touched.push(y); + } + *a += w * wy; + } + } +} + +/// Phase 2 — prune + verify. For each touched candidate `y`, reset its accumulator and test the +/// **L2AP `l2` bound**: the dot mass missing from `acc[y]` (the dims in `prefix(y)`) is at most +/// `‖x_{rank, +) { + let (di, wi) = c.row(i); + // Probe prefix L2 norms: xpn[k] = ‖wi[..k]‖₂ (di ascending by rank, so xpn[k] = norm over the + // probe's k lowest-rank dims). One sqrt per probe dim, reused across all its candidates. + s.xpn.clear(); + s.xpn.push(0.0); + let mut sq = 0.0f64; + for &w in wi { + sq += w * w; + s.xpn.push(sq.sqrt()); + } + let need = t - PRUNE_SLACK; + let Scratch { acc, touched, xpn } = s; + // (Software-prefetching the candidate row a few ahead was tried + reverted: no measurable change + // — with all cores gathering at once the join is memory-*bandwidth*-bound, not per-access + // latency-bound, so prefetch can't add throughput.) + for &y in touched.iter() { + let yu = y as usize; + // SAFETY: `yu < n` (same provenance as in `accumulate`). + let a = unsafe { std::mem::replace(acc.get_unchecked_mut(yu), -1.0) }; + // SAFETY: `yu < n`. One scattered load fetches both prune fields. + let bd = unsafe { *cached.bound.get_unchecked(yu) }; + // Number of probe dims with rank < split[y] → index into xpn (di sorted ascending). + let kstar = di.partition_point(|&d| d < bd.split); + // SAFETY: kstar ≤ di.len() == wi.len() = xpn.len()-1. + // (An added maxweight cap `min(…, Σ wx·maxw)` was tried + reverted: never tighter than the + // L2 cap on either synthetic or real data — it sums over all probe-prefix dims, not just the + // shared ones — so it pruned nothing and cost ~16% on the real PyPI corpus.) + let bound = a + unsafe { xpn.get_unchecked(kstar) } * bd.pnorm; + if bound >= need { + let cos = cos_full((di, wi), c.row(yu)); + if cos >= t { + out.push((yu, i, cos)); + } + } + } +} + +/// Phase 3 — index this vector's suffix: skip the leading prefix whose max possible contribution to +/// any dot stays `< t` (`Σ w_k·maxw[dim_k] < t`), index only the rarer tail (short postings = the +/// whole speed-up), and cache `pnorm[i] = ‖prefix‖₂` and `split[i]` = first indexed rank for the +/// [`verify_pruned`] bound. +#[cfg_attr(feature = "profiling", inline(never))] +fn index_suffix( + c: &Corpus, + i: usize, + (di, wi): (&[u32], &[f64]), + t: f64, + index: &mut [Vec<(u32, f64)>], + cached: &mut Cached, +) { + // Largest safe prefix under the maxweight bound: `Σ_{k= t { + break; + } + rs += bound; + b = k + 1; + } + let mut p = 0.0f64; + for &w in &wi[..b] { + p += w * w; + } + cached.bound[i] = Bound { + pnorm: p.sqrt(), + split: if b < di.len() { di[b] } else { u32::MAX }, + }; + for k in b..di.len() { + index[di[k] as usize].push((i as u32, wi[k])); + } +} + +/// FP margin for the GPU f32 cosine *filter* in [`cosine_join_gpu`]: a survivor is dropped only when +/// its GPU f32 cosine is below `t` by more than this. The f32 dot's error is `~1e-6` relative, so a +/// `1e-4` absolute margin never drops a true pair; the CPU then recomputes the exact `f64` score on +/// whatever passes, so the emitted pair set + scores stay bit-identical to [`cosine_join`]. +#[cfg(all(target_os = "macos", feature = "gpu"))] +const GPU_FILTER_MARGIN: f64 = 1e-4; + +/// Like the verify half of [`verify_pruned`], but instead of scoring, **collects** each surviving +/// `(candidate, probe)` pair (`candidate < probe`) for batch scoring elsewhere. The bound here MUST +/// stay identical to `verify_pruned`'s so the survivor set matches exactly. +#[cfg(all(target_os = "macos", feature = "gpu"))] +fn collect_survivors(c: &Corpus, i: usize, t: f64, s: &mut Scratch, cached: &Cached, out: &mut Vec<(u32, u32)>) { + let (di, wi) = c.row(i); + s.xpn.clear(); + s.xpn.push(0.0); + let mut sq = 0.0f64; + for &w in wi { + sq += w * w; + s.xpn.push(sq.sqrt()); + } + let need = t - PRUNE_SLACK; + let Scratch { acc, touched, xpn } = s; + for &y in touched.iter() { + let yu = y as usize; + // SAFETY: `yu < n` (same provenance as in `accumulate`). + let a = unsafe { std::mem::replace(acc.get_unchecked_mut(yu), -1.0) }; + let bd = unsafe { *cached.bound.get_unchecked(yu) }; + let kstar = di.partition_point(|&d| d < bd.split); + let bound = a + unsafe { xpn.get_unchecked(kstar) } * bd.pnorm; + if bound >= need { + out.push((y, i as u32)); // (candidate, probe), candidate < probe + } + } +} + +/// **CPU+GPU hybrid join** (feature `gpu`, macOS). Returns the **exact same** pair set + scores as +/// [`cosine_join`] (and thus the brute-force oracle) — only faster when the verify is bandwidth-bound. +/// +/// Pipeline: CPU builds the index and, in parallel, accumulates + bounds every probe to a list of +/// surviving `(candidate, probe)` pairs (no `cos_full`). The GPU then computes an f32 cosine for the +/// whole batch (its memory-level parallelism clears the random-gather dots ~3× faster than the CPU), +/// and the CPU recomputes the exact `f64` `cos_full` **only** on the pairs whose GPU score clears +/// `t − margin` — typically a few percent of survivors. Because the GPU is a *conservative filter* +/// (margin ≫ f32 error, so no true pair is ever dropped) and every emitted score is the exact CPU +/// `f64` value, the output is byte-for-byte identical to [`cosine_join`]. +#[cfg(all(target_os = "macos", feature = "gpu"))] +#[must_use] +pub fn cosine_join_gpu( + c: &Corpus, + t: f64, + gpu: &crate::simjoin_gpu::BatchCosineGpu, +) -> Vec<(usize, usize, f64)> { + let (pa, pb) = survivor_pairs(c, t); + if pa.is_empty() { + return Vec::new(); + } + // GPU phase: f32 cosine over the whole survivor batch (conservative filter). + let gcos = gpu.cosine_batch(&pa, &pb); + let need = t - GPU_FILTER_MARGIN; + // CPU phase: exact f64 re-verify only on pairs the GPU filter passes. + (0..pa.len()) + .into_par_iter() + .with_min_len(1024) + .filter_map(|k| { + if f64::from(gcos[k]) < need { + return None; + } + let (a, b) = (pa[k] as usize, pb[k] as usize); + let cos = cos_full(c.row(a), c.row(b)); + (cos >= t).then_some((a, b, cos)) + }) + .collect() +} + +/// **Pure-f32** CPU+GPU join (feature `gpu`, macOS): same survivor generation as [`cosine_join_gpu`] +/// but emits the GPU's **f32** cosine directly, with **no exact f64 re-verify**. Trades byte-parity +/// for speed (no re-verify, and `cos_full` never runs on the GPU survivors). The result differs from +/// [`cosine_join`] only on pairs whose true cosine lies within ~`1e-6` (f32 rounding) of `t` — for a +/// similarity join with an arbitrary threshold that is immaterial. Use when an ε-exact answer is +/// acceptable; use [`cosine_join_gpu`] when bit-exactness is required. +#[cfg(all(target_os = "macos", feature = "gpu"))] +#[must_use] +pub fn cosine_join_gpu_f32( + c: &Corpus, + t: f64, + gpu: &crate::simjoin_gpu::BatchCosineGpu, +) -> Vec<(usize, usize, f32)> { + let (pa, pb) = survivor_pairs(c, t); + if pa.is_empty() { + return Vec::new(); + } + let gcos = gpu.cosine_batch(&pa, &pb); + let tf = t as f32; + (0..pa.len()) + .into_par_iter() + .with_min_len(1024) + .filter_map(|k| (gcos[k] >= tf).then_some((pa[k] as usize, pb[k] as usize, gcos[k]))) + .collect() +} + +/// CPU half shared by the GPU joins: build the index, then accumulate + bound every probe in +/// parallel to the list of surviving `(candidate, probe)` pairs (candidate `<` probe), split into +/// two `u32` arrays ready for [`crate::simjoin_gpu::BatchCosineGpu::cosine_batch`]. +#[cfg(all(target_os = "macos", feature = "gpu"))] +fn survivor_pairs(c: &Corpus, t: f64) -> (Vec, Vec) { + let n = c.n; + let mut index: Vec> = vec![Vec::new(); c.ndims]; + let mut cached = Cached { bound: vec![Bound { pnorm: 0.0, split: u32::MAX }; n] }; + for i in 0..n { + let (di, wi) = c.row(i); + index_suffix(c, i, (di, wi), t, &mut index, &mut cached); + } + let pairs: Vec<(u32, u32)> = (0..n) + .into_par_iter() + .with_min_len(256) + .map_init( + || Scratch { acc: vec![-1.0; n], touched: Vec::new(), xpn: Vec::new() }, + |scratch, i| { + let (di, wi) = c.row(i); + accumulate(&index, (di, wi), i as u32, scratch); + let mut out = Vec::new(); + collect_survivors(c, i, t, scratch, &cached, &mut out); + out + }, + ) + .flatten() + .collect(); + pairs.into_iter().unzip() +} + +/// Diagnostic (feature `profiling`, off the hot path): counts that quantify the prune. Returns +/// `(candidates, survivors, pairs)` — candidates touched by the accumulator, survivors that pass the +/// Cauchy–Schwarz bound (i.e. the `cos_full` calls actually made), and real pairs. `survivors / +/// candidates` is the prune pass-rate (lower = better); `survivors` is the verify volume we pay for. +#[cfg(feature = "profiling")] +#[must_use] +pub fn cosine_join_counts(c: &Corpus, t: f64) -> (u64, u64, u64) { + let n = c.n; + let mut index: Vec> = vec![Vec::new(); c.ndims]; + let mut cached = Cached { bound: vec![Bound { pnorm: 0.0, split: u32::MAX }; n] }; + for i in 0..n { + let (di, wi) = c.row(i); + index_suffix(c, i, (di, wi), t, &mut index, &mut cached); + } + let mut s = Scratch { acc: vec![-1.0; n], touched: Vec::new(), xpn: Vec::new() }; + let (mut ncand, mut survivors, mut pairs) = (0u64, 0u64, 0u64); + let need = t - PRUNE_SLACK; + for i in 0..n { + let (di, wi) = c.row(i); + accumulate(&index, (di, wi), i as u32, &mut s); + ncand += s.touched.len() as u64; + s.xpn.clear(); + s.xpn.push(0.0); + let mut sq = 0.0f64; + for &w in wi { + sq += w * w; + s.xpn.push(sq.sqrt()); + } + for &y in &s.touched { + let yu = y as usize; + let a = std::mem::replace(&mut s.acc[yu], -1.0); + let bd = cached.bound[yu]; + let kstar = di.partition_point(|&d| d < bd.split); + if a + s.xpn[kstar] * bd.pnorm >= need { + survivors += 1; + if cos_full((di, wi), c.row(yu)) >= t { + pairs += 1; + } + } + } + } + (ncand, survivors, pairs) +} + +/// Naive `O(n²)` oracle: score every pair with [`cos_full`], keep `cos ≥ t`. The correctness +/// reference [`cosine_join`] is validated against. +#[must_use] +pub fn cosine_join_bruteforce(c: &Corpus, t: f64) -> Vec<(usize, usize, f64)> { + let mut out: Vec<(usize, usize, f64)> = Vec::new(); + for i in 0..c.n { + for j in 0..i { + let s = cos_full(c.row(i), c.row(j)); + if s >= t { + out.push((j, i, s)); + } + } + } + out +} + +#[cfg(test)] +mod tests { + use super::{cosine_join, cosine_join_bruteforce, Corpus}; + + fn xorshift(seed: u64) -> impl FnMut() -> u64 { + let mut s = seed; + move || { + s ^= s << 13; + s ^= s >> 7; + s ^= s << 17; + s + } + } + + fn sort_pairs(mut v: Vec<(usize, usize, f64)>) -> Vec<(usize, usize, u64)> { + v.sort_by_key(|a| (a.0, a.1)); + v.into_iter().map(|(a, b, s)| (a, b, s.to_bits())).collect() + } + + #[test] + fn indexed_join_matches_bruteforce() { + let mut next = xorshift(0x9e37_79b9_7f4a_7c15); + for _ in 0..400 { + let n = (next() % 40 + 2) as usize; + let dim_space = next() % 15 + 1; + let rows: Vec> = (0..n) + .map(|_| { + let nnz = (next() % 8) as usize; + (0..nnz) + .map(|_| ((next() % dim_space) as u32, (next() % 10 + 1) as f64)) + .collect() + }) + .collect(); + let c = Corpus::from_rows(&rows); + for &t in &[0.1_f64, 0.25, 0.5, 0.75, 0.9, 1.0] { + let got = sort_pairs(cosine_join(&c, t)); + let want = sort_pairs(cosine_join_bruteforce(&c, t)); + assert_eq!(got, want, "n={n} t={t}"); + } + } + } + + /// The CPU+GPU hybrid [`super::cosine_join_gpu`] must return bit-identical results to the pure-CPU + /// [`cosine_join`] on fuzzed corpora — the GPU is only a conservative filter; every emitted score + /// is the exact CPU `f64` value. Skips when no Metal device is present. + #[cfg(all(feature = "gpu", target_os = "macos"))] + #[test] + fn gpu_hybrid_matches_cpu() { + use super::CosineJoiner; + use crate::Concurrency; + let mut next = xorshift(0x1357_9bdf_0246_8ace); + for _ in 0..60 { + let n = (next() % 60 + 4) as usize; + let dim_space = next() % 20 + 2; + let rows: Vec> = (0..n) + .map(|_| { + let nnz = (next() % 10) as usize; + (0..nnz) + .map(|_| ((next() % dim_space) as u32, (next() % 10 + 1) as f64)) + .collect() + }) + .collect(); + // One reusable handle per corpus — `join` is called repeatedly across thresholds, which + // also exercises that the handle reuses its GPU resources (no per-call library compile). + let joiner = CosineJoiner::new(Corpus::from_rows(&rows)); + if !joiner.has_gpu() { + eprintln!("no Metal device — skipping gpu_hybrid_matches_cpu"); + return; + } + for &t in &[0.1_f64, 0.3, 0.5, 0.7, 0.9, 1.0] { + let want = sort_pairs(cosine_join(joiner.corpus(), t)); + // Exact GPU+CPU hybrid is byte-identical to the plain join; `Cpu` mode too. + assert_eq!( + sort_pairs(joiner.join(t, Concurrency::GpuPlusCpu)), + want, + "GpuPlusCpu n={n} t={t}" + ); + assert_eq!(sort_pairs(joiner.join(t, Concurrency::Cpu)), want, "Cpu n={n} t={t}"); + } + } + } +} diff --git a/src/simjoin_gpu.rs b/src/simjoin_gpu.rs new file mode 100644 index 0000000..e610a60 --- /dev/null +++ b/src/simjoin_gpu.rs @@ -0,0 +1,170 @@ +//! GPU (Metal) batched sparse-cosine — the offload experiment for [`crate::simjoin`]. +//! +//! The CPU L2AP join is **memory-bandwidth-bound** on its exact-verify step: ~10⁸ candidate pairs, +//! each an `O(nnz)` sorted-merge dot that gathers two random CSR rows. This module tests whether the +//! Apple GPU — which sustains far more in-flight memory requests against the same unified-memory pool +//! — clears those dot-products faster. One GPU thread per pair walks the two rows and writes `cos`. +//! +//! **Parity caveat (important):** Metal has no `f64` (Apple GPUs are 32-bit float only), so the GPU +//! dot is computed in `f32` and is *not* bit-identical to the CPU `f64` `cos_full`. The GPU result is +//! therefore only usable as an **approximate, conservative filter** (CPU re-verifies survivors +//! exactly to preserve the byte-for-byte gate) — never as the emitted score. This module exists to +//! measure the throughput question; wiring it as a filter is gated on it actually being faster. +#![allow(clippy::cast_possible_truncation, clippy::doc_markdown, clippy::similar_names)] + +use std::ffi::c_void; + +use metal::{ + Buffer, CommandQueue, ComputePipelineState, Device, Library, MTLResourceOptions, MTLSize, + NSUInteger, +}; + +/// MSL: one thread per pair, sorted-merge dot of two CSR rows (f32). Rows are dim-ascending, so the +/// merge mirrors the CPU `cos_full`. +const KERNEL_SRC: &str = r" +#include +using namespace metal; +kernel void batch_cosine( + device const uint* indptr [[buffer(0)]], + device const uint* dims [[buffer(1)]], + device const float* wts [[buffer(2)]], + device const uint* pa [[buffer(3)]], + device const uint* pb [[buffer(4)]], + device float* out [[buffer(5)]], + constant uint& npairs [[buffer(6)]], + uint tid [[thread_position_in_grid]] +) { + if (tid >= npairs) return; + uint a = pa[tid], b = pb[tid]; + uint ia = indptr[a], ea = indptr[a + 1]; + uint ib = indptr[b], eb = indptr[b + 1]; + float s = 0.0f; + while (ia < ea && ib < eb) { + uint da = dims[ia], db = dims[ib]; + if (da == db) { s += wts[ia] * wts[ib]; ia++; ib++; } + else if (da < db) { ia++; } + else { ib++; } + } + out[tid] = s; +} +"; + +/// A CSR corpus resident in unified memory + the compiled `batch_cosine` pipeline. +pub struct BatchCosineGpu { + device: Device, + queue: CommandQueue, + _library: Library, + pipeline: ComputePipelineState, + indptr: Buffer, + dims: Buffer, + wts: Buffer, + n: usize, +} + +// SAFETY: Metal device/queue/library/pipeline/buffers are documented thread-safe (see `gpu.rs`). +unsafe impl Send for BatchCosineGpu {} +unsafe impl Sync for BatchCosineGpu {} + +impl BatchCosineGpu { + /// Acquire the default Metal device, compile the kernel, and upload the CSR corpus (`indptr` + /// length `n+1`, `dims`/`wts` length `nnz`) into UMA once. Returns `None` if no Metal device or + /// the kernel fails to compile. + #[must_use] + pub fn new(indptr: &[u32], dims: &[u32], wts: &[f32]) -> Option { + let device = Device::system_default()?; + let queue = device.new_command_queue(); + let options = metal::CompileOptions::new(); + options.set_fast_math_enabled(true); + let library = match device.new_library_with_source(KERNEL_SRC, &options) { + Ok(l) => l, + Err(e) => { + eprintln!("simjoin_gpu: kernel compile failed: {e}"); + return None; + } + }; + let func = library.get_function("batch_cosine", None).ok()?; + let pipeline = device.new_compute_pipeline_state_with_function(&func).ok()?; + let indptr_buf = upload(&device, indptr); + let dims_buf = upload(&device, dims); + let wts_buf = upload(&device, wts); + Some(BatchCosineGpu { + device, + queue, + _library: library, + pipeline, + indptr: indptr_buf, + dims: dims_buf, + wts: wts_buf, + n: indptr.len().saturating_sub(1), + }) + } + + /// Device name (e.g. "Apple M3 Pro"). + #[must_use] + pub fn device_name(&self) -> String { + self.device.name().to_string() + } + + /// Number of vectors in the resident corpus. + #[must_use] + pub fn len(&self) -> usize { + self.n + } + + /// True if the resident corpus is empty. + #[must_use] + pub fn is_empty(&self) -> bool { + self.n == 0 + } + + /// Compute `cos(row pa[t], row pb[t])` (f32 sparse dot) for every pair `t`, on the GPU. + /// `pa` and `pb` must be equal length with all ids `< len()`. + /// + /// # Panics + /// + /// Panics if `pa.len() != pb.len()`. + #[must_use] + pub fn cosine_batch(&self, pa: &[u32], pb: &[u32]) -> Vec { + assert_eq!(pa.len(), pb.len(), "pair arrays must match length"); + let np = pa.len(); + if np == 0 { + return Vec::new(); + } + let buf_pa = upload(&self.device, pa); + let buf_pb = upload(&self.device, pb); + let out_bytes = (np * std::mem::size_of::()) as NSUInteger; + let buf_out = self.device.new_buffer(out_bytes, MTLResourceOptions::StorageModeShared); + let np_u32 = np as u32; + let buf_np = self.device.new_buffer_with_data( + (&raw const np_u32).cast::(), + std::mem::size_of::() as NSUInteger, + MTLResourceOptions::StorageModeShared, + ); + + let cmd = self.queue.new_command_buffer(); + let enc = cmd.new_compute_command_encoder(); + enc.set_compute_pipeline_state(&self.pipeline); + for (i, b) in [&self.indptr, &self.dims, &self.wts, &buf_pa, &buf_pb, &buf_out, &buf_np] + .into_iter() + .enumerate() + { + enc.set_buffer(i as u64, Some(b), 0); + } + let max_t = self.pipeline.max_total_threads_per_threadgroup() as usize; + let tg = max_t.min(np).max(1); + enc.dispatch_threads(MTLSize::new(np as u64, 1, 1), MTLSize::new(tg as u64, 1, 1)); + enc.end_encoding(); + cmd.commit(); + cmd.wait_until_completed(); + + let ptr = buf_out.contents().cast::(); + // SAFETY: `buf_out` holds `np` f32s the kernel filled; it outlives this read. + unsafe { std::slice::from_raw_parts(ptr, np).to_vec() } + } +} + +/// Copy `data` into a fresh shared (UMA) buffer — one memcpy into unified memory (zero device copy). +fn upload(device: &Device, data: &[T]) -> Buffer { + let bytes = (std::mem::size_of_val(data) as NSUInteger).max(1); + device.new_buffer_with_data(data.as_ptr().cast::(), bytes, MTLResourceOptions::StorageModeShared) +}