From 03723d26a92e2338a08cf972c618f4c72843c5ab Mon Sep 17 00:00:00 2001 From: Baris Palaska Date: Fri, 1 May 2026 16:30:14 +0100 Subject: [PATCH 1/7] cache Signed-off-by: Baris Palaska --- vortex-array/Cargo.toml | 4 + vortex-array/benches/scalar_at_patches.rs | 140 ++++++++++++++++++++++ vortex-array/src/patches.rs | 127 +++++++++++++++++++- 3 files changed, 267 insertions(+), 4 deletions(-) create mode 100644 vortex-array/benches/scalar_at_patches.rs diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index f8676d76ef0..5489d22cfb9 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -124,6 +124,10 @@ harness = false name = "scalar_at_struct" harness = false +[[bench]] +name = "scalar_at_patches" +harness = false + [[bench]] name = "varbinview_compact" harness = false diff --git a/vortex-array/benches/scalar_at_patches.rs b/vortex-array/benches/scalar_at_patches.rs new file mode 100644 index 00000000000..c49aeeaa6c5 --- /dev/null +++ b/vortex-array/benches/scalar_at_patches.rs @@ -0,0 +1,140 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![expect(clippy::unwrap_used)] +#![expect(clippy::cast_possible_truncation)] + +use divan::Bencher; +use rand::RngExt; +use rand::SeedableRng; +use rand::rngs::StdRng; +use vortex_array::IntoArray; +use vortex_array::patches::Patches; +use vortex_buffer::Buffer; + +fn main() { + divan::main(); +} + +const ARRAY_LEN: usize = 1_000_000; +const NUM_PATCHES: usize = 100; +const NUM_QUERIES: usize = 10_000; +const PATCH_LOW: usize = 100_000; +const PATCH_HIGH: usize = 110_000; + +fn narrow_band_patches() -> Patches { + let mut rng = StdRng::seed_from_u64(42); + let mut indices: Vec = (0..NUM_PATCHES) + .map(|_| rng.random_range((PATCH_LOW as u64)..(PATCH_HIGH as u64))) + .collect(); + indices.sort(); + indices.dedup(); + let values: Buffer = (0..indices.len() as i32).collect(); + Patches::new( + ARRAY_LEN, + 0, + Buffer::from(indices).into_array(), + values.into_array(), + None, + ) + .unwrap() +} + +fn full_range_patches() -> Patches { + let mut rng = StdRng::seed_from_u64(43); + let mut indices: Vec = (0..NUM_PATCHES) + .map(|_| rng.random_range(0..(ARRAY_LEN as u64))) + .collect(); + indices.sort(); + indices.dedup(); + let values: Buffer = (0..indices.len() as i32).collect(); + Patches::new( + ARRAY_LEN, + 0, + Buffer::from(indices).into_array(), + values.into_array(), + None, + ) + .unwrap() +} + +#[divan::bench] +fn search_index_below_min(bencher: Bencher) { + let patches = narrow_band_patches(); + let queries: Vec = (0..NUM_QUERIES).collect(); + + bencher.bench_local(|| { + for &q in &queries { + std::hint::black_box(patches.search_index(q).unwrap()); + } + }); +} + +#[divan::bench] +fn search_index_above_max(bencher: Bencher) { + let patches = narrow_band_patches(); + let queries: Vec = (PATCH_HIGH..(PATCH_HIGH + NUM_QUERIES)).collect(); + + bencher.bench_local(|| { + for &q in &queries { + std::hint::black_box(patches.search_index(q).unwrap()); + } + }); +} + +#[divan::bench] +fn search_index_mixed_out_of_range(bencher: Bencher) { + let patches = narrow_band_patches(); + let queries: Vec = (0..NUM_QUERIES / 2) + .map(|i| i * 100) + .chain((0..NUM_QUERIES / 2).map(|i| PATCH_HIGH + i * 50)) + .collect(); + + bencher.bench_local(|| { + for &q in &queries { + std::hint::black_box(patches.search_index(q).unwrap()); + } + }); +} + +#[divan::bench] +fn search_index_in_range(bencher: Bencher) { + let patches = narrow_band_patches(); + let mut rng = StdRng::seed_from_u64(7); + let queries: Vec = (0..NUM_QUERIES) + .map(|_| rng.random_range(PATCH_LOW..PATCH_HIGH)) + .collect(); + + bencher.bench_local(|| { + for &q in &queries { + std::hint::black_box(patches.search_index(q).unwrap()); + } + }); +} + +#[divan::bench] +fn search_index_full_range_random(bencher: Bencher) { + let patches = full_range_patches(); + let mut rng = StdRng::seed_from_u64(11); + let queries: Vec = (0..NUM_QUERIES) + .map(|_| rng.random_range(0..ARRAY_LEN)) + .collect(); + + bencher.bench_local(|| { + for &q in &queries { + std::hint::black_box(patches.search_index(q).unwrap()); + } + }); +} + +#[divan::bench] +fn get_patched_above_max(bencher: Bencher) { + let patches = narrow_band_patches(); + let queries: Vec = (PATCH_HIGH..(PATCH_HIGH + NUM_QUERIES)).collect(); + + bencher.bench_local(|| { + for &q in &queries { + std::hint::black_box(patches.get_patched(q).unwrap()); + } + }); +} diff --git a/vortex-array/src/patches.rs b/vortex-array/src/patches.rs index fa532546b50..43f267867f7 100644 --- a/vortex-array/src/patches.rs +++ b/vortex-array/src/patches.rs @@ -5,6 +5,7 @@ use std::cmp::Ordering; use std::fmt::Debug; use std::hash::Hash; use std::ops::Range; +use std::sync::OnceLock; use num_traits::NumCast; use vortex_buffer::BitBuffer; @@ -146,6 +147,10 @@ pub struct Patches { /// `offset_within_chunk` is necessary in order to keep track of how many /// elements were sliced off within the chunk. offset_within_chunk: Option, + /// Cached `indices[0] - offset`. + min_index: OnceLock, + /// Cached `indices[len - 1] - offset`. + max_index: OnceLock, } impl Patches { @@ -172,6 +177,9 @@ impl Patches { ); vortex_ensure!(!indices.is_empty(), "Patch indices must not be empty"); + let min_index = OnceLock::new(); + let max_index = OnceLock::new(); + // Perform validation of components when they are host-resident. // This is not possible to do eagerly when the data is on GPU memory. if indices.is_host() && values.is_host() { @@ -185,6 +193,14 @@ impl Patches { "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}" ); + // Pre-populate bounds caches for the search_index fast path. + let min = usize::try_from( + &indices.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?, + ) + .map_err(|_| vortex_err!("indices must be a number"))?; + let _ = max_index.set(max - offset); + let _ = min_index.set(min - offset); + #[cfg(debug_assertions)] { use crate::VortexSessionExecute; @@ -205,6 +221,8 @@ impl Patches { chunk_offsets: chunk_offsets.clone(), // Initialize with `Some(0)` only if `chunk_offsets` are set. offset_within_chunk: chunk_offsets.map(|_| 0), + min_index, + max_index, }) } @@ -232,6 +250,8 @@ impl Patches { values, chunk_offsets, offset_within_chunk, + min_index: OnceLock::new(), + max_index: OnceLock::new(), } } @@ -394,6 +414,10 @@ impl Patches { /// [`SearchResult::Found(patch_idx)`]: SearchResult::Found /// [`SearchResult::NotFound(insertion_point)`]: SearchResult::NotFound pub fn search_index(&self, index: usize) -> VortexResult { + if let Some(result) = self.search_index_out_of_range(index) { + return Ok(result); + } + if self.chunk_offsets.is_some() { return self.search_index_chunked(index); } @@ -401,6 +425,20 @@ impl Patches { Self::search_index_binary_search(&self.indices, index + self.offset) } + /// Returns the search result if `index` falls outside the cached bounds, + /// or `None` if the bounds are uncached or `index` is in range. + fn search_index_out_of_range(&self, index: usize) -> Option { + let min = *self.min_index.get()?; + let max = *self.max_index.get()?; + if index < min { + Some(SearchResult::NotFound(0)) + } else if index > max { + Some(SearchResult::NotFound(self.indices.len())) + } else { + None + } + } + /// Binary searches for `needle` in the indices array. /// /// # Returns @@ -551,19 +589,27 @@ impl Patches { }) } - /// Returns the minimum patch index + /// Returns the minimum patch index. pub fn min_index(&self) -> VortexResult { + if let Some(&v) = self.min_index.get() { + return Ok(v); + } let first = self .indices .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())? .as_primitive() .as_::() .ok_or_else(|| vortex_err!("index does not fit in usize"))?; - Ok(first - self.offset) + let result = first - self.offset; + let _ = self.min_index.set(result); + Ok(result) } - /// Returns the maximum patch index + /// Returns the maximum patch index. pub fn max_index(&self) -> VortexResult { + if let Some(&v) = self.max_index.get() { + return Ok(v); + } let last = self .indices .execute_scalar( @@ -573,7 +619,9 @@ impl Patches { .as_primitive() .as_::() .ok_or_else(|| vortex_err!("index does not fit in usize"))?; - Ok(last - self.offset) + let result = last - self.offset; + let _ = self.max_index.set(result); + Ok(result) } /// Filter the patches by a mask, resulting in new patches for the filtered array. @@ -648,6 +696,8 @@ impl Patches { // TODO(0ax1): Chunk offsets are invalid after a filter is applied. chunk_offsets: None, offset_within_chunk: self.offset_within_chunk, + min_index: OnceLock::new(), + max_index: OnceLock::new(), })) } @@ -695,6 +745,8 @@ impl Patches { values, chunk_offsets: new_chunk_offsets, offset_within_chunk, + min_index: OnceLock::new(), + max_index: OnceLock::new(), })) } @@ -830,6 +882,8 @@ impl Patches { .take(PrimitiveArray::new(values_indices, values_validity).into_array())?, chunk_offsets: None, offset_within_chunk: Some(0), // Reset when creating new Patches. + min_index: OnceLock::new(), + max_index: OnceLock::new(), })) } @@ -879,6 +933,8 @@ impl Patches { // TODO(0ax1): Chunk offsets are invalid after take is applied. chunk_offsets: None, offset_within_chunk: self.offset_within_chunk, + min_index: OnceLock::new(), + max_index: OnceLock::new(), })) } @@ -902,6 +958,9 @@ impl Patches { values, chunk_offsets: self.chunk_offsets, offset_within_chunk: self.offset_within_chunk, + // indices and offset are preserved, so the cached bounds remain valid. + min_index: self.min_index, + max_index: self.max_index, }) } } @@ -1750,6 +1809,66 @@ mod test { assert_eq!(patches.search_index(9).unwrap(), SearchResult::NotFound(3)); } + #[test] + fn test_search_index_out_of_range_fast_path() { + let patches = Patches::new( + 100, + 0, + buffer![10u64, 20, 30, 40].into_array(), + buffer![1i32, 2, 3, 4].into_array(), + None, + ) + .unwrap(); + + assert_eq!( + patches.search_index_out_of_range(0), + Some(SearchResult::NotFound(0)) + ); + assert_eq!( + patches.search_index_out_of_range(9), + Some(SearchResult::NotFound(0)) + ); + assert_eq!( + patches.search_index_out_of_range(41), + Some(SearchResult::NotFound(4)) + ); + assert_eq!( + patches.search_index_out_of_range(99), + Some(SearchResult::NotFound(4)) + ); + assert_eq!(patches.search_index_out_of_range(10), None); + assert_eq!(patches.search_index_out_of_range(25), None); + assert_eq!(patches.search_index_out_of_range(40), None); + + assert_eq!(patches.search_index(5).unwrap(), SearchResult::NotFound(0)); + assert_eq!(patches.search_index(50).unwrap(), SearchResult::NotFound(4)); + } + + #[test] + fn test_search_index_out_of_range_with_offset() { + let patches = Patches::new( + 100, + 0, + buffer![10u64, 50, 90].into_array(), + buffer![1i32, 2, 3].into_array(), + None, + ) + .unwrap(); + let sliced = patches.slice(40..95).unwrap().unwrap(); + + assert_eq!(sliced.min_index().unwrap(), 10); + assert_eq!(sliced.max_index().unwrap(), 50); + assert_eq!( + sliced.search_index_out_of_range(5), + Some(SearchResult::NotFound(0)) + ); + assert_eq!( + sliced.search_index_out_of_range(54), + Some(SearchResult::NotFound(2)) + ); + assert_eq!(sliced.search_index_out_of_range(30), None); + } + #[test] fn test_mask_boundary_patches() { // Test masking patches at array boundaries From 705b9a580b1006ee5d253310a46f149ac2b5a847 Mon Sep 17 00:00:00 2001 From: Baris Palaska Date: Fri, 1 May 2026 16:47:15 +0100 Subject: [PATCH 2/7] 1k Signed-off-by: Baris Palaska --- vortex-array/benches/scalar_at_patches.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-array/benches/scalar_at_patches.rs b/vortex-array/benches/scalar_at_patches.rs index c49aeeaa6c5..42567fd63c8 100644 --- a/vortex-array/benches/scalar_at_patches.rs +++ b/vortex-array/benches/scalar_at_patches.rs @@ -18,7 +18,7 @@ fn main() { const ARRAY_LEN: usize = 1_000_000; const NUM_PATCHES: usize = 100; -const NUM_QUERIES: usize = 10_000; +const NUM_QUERIES: usize = 1_000; const PATCH_LOW: usize = 100_000; const PATCH_HIGH: usize = 110_000; From ecc9ec16a920c9e93e6729f0da397d272692c179 Mon Sep 17 00:00:00 2001 From: Baris Palaska Date: Tue, 5 May 2026 14:03:24 +0100 Subject: [PATCH 3/7] rm bench Signed-off-by: Baris Palaska --- vortex-array/Cargo.toml | 4 - vortex-array/benches/scalar_at_patches.rs | 140 ---------------------- 2 files changed, 144 deletions(-) delete mode 100644 vortex-array/benches/scalar_at_patches.rs diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 5489d22cfb9..f8676d76ef0 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -124,10 +124,6 @@ harness = false name = "scalar_at_struct" harness = false -[[bench]] -name = "scalar_at_patches" -harness = false - [[bench]] name = "varbinview_compact" harness = false diff --git a/vortex-array/benches/scalar_at_patches.rs b/vortex-array/benches/scalar_at_patches.rs deleted file mode 100644 index 42567fd63c8..00000000000 --- a/vortex-array/benches/scalar_at_patches.rs +++ /dev/null @@ -1,140 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -#![expect(clippy::unwrap_used)] -#![expect(clippy::cast_possible_truncation)] - -use divan::Bencher; -use rand::RngExt; -use rand::SeedableRng; -use rand::rngs::StdRng; -use vortex_array::IntoArray; -use vortex_array::patches::Patches; -use vortex_buffer::Buffer; - -fn main() { - divan::main(); -} - -const ARRAY_LEN: usize = 1_000_000; -const NUM_PATCHES: usize = 100; -const NUM_QUERIES: usize = 1_000; -const PATCH_LOW: usize = 100_000; -const PATCH_HIGH: usize = 110_000; - -fn narrow_band_patches() -> Patches { - let mut rng = StdRng::seed_from_u64(42); - let mut indices: Vec = (0..NUM_PATCHES) - .map(|_| rng.random_range((PATCH_LOW as u64)..(PATCH_HIGH as u64))) - .collect(); - indices.sort(); - indices.dedup(); - let values: Buffer = (0..indices.len() as i32).collect(); - Patches::new( - ARRAY_LEN, - 0, - Buffer::from(indices).into_array(), - values.into_array(), - None, - ) - .unwrap() -} - -fn full_range_patches() -> Patches { - let mut rng = StdRng::seed_from_u64(43); - let mut indices: Vec = (0..NUM_PATCHES) - .map(|_| rng.random_range(0..(ARRAY_LEN as u64))) - .collect(); - indices.sort(); - indices.dedup(); - let values: Buffer = (0..indices.len() as i32).collect(); - Patches::new( - ARRAY_LEN, - 0, - Buffer::from(indices).into_array(), - values.into_array(), - None, - ) - .unwrap() -} - -#[divan::bench] -fn search_index_below_min(bencher: Bencher) { - let patches = narrow_band_patches(); - let queries: Vec = (0..NUM_QUERIES).collect(); - - bencher.bench_local(|| { - for &q in &queries { - std::hint::black_box(patches.search_index(q).unwrap()); - } - }); -} - -#[divan::bench] -fn search_index_above_max(bencher: Bencher) { - let patches = narrow_band_patches(); - let queries: Vec = (PATCH_HIGH..(PATCH_HIGH + NUM_QUERIES)).collect(); - - bencher.bench_local(|| { - for &q in &queries { - std::hint::black_box(patches.search_index(q).unwrap()); - } - }); -} - -#[divan::bench] -fn search_index_mixed_out_of_range(bencher: Bencher) { - let patches = narrow_band_patches(); - let queries: Vec = (0..NUM_QUERIES / 2) - .map(|i| i * 100) - .chain((0..NUM_QUERIES / 2).map(|i| PATCH_HIGH + i * 50)) - .collect(); - - bencher.bench_local(|| { - for &q in &queries { - std::hint::black_box(patches.search_index(q).unwrap()); - } - }); -} - -#[divan::bench] -fn search_index_in_range(bencher: Bencher) { - let patches = narrow_band_patches(); - let mut rng = StdRng::seed_from_u64(7); - let queries: Vec = (0..NUM_QUERIES) - .map(|_| rng.random_range(PATCH_LOW..PATCH_HIGH)) - .collect(); - - bencher.bench_local(|| { - for &q in &queries { - std::hint::black_box(patches.search_index(q).unwrap()); - } - }); -} - -#[divan::bench] -fn search_index_full_range_random(bencher: Bencher) { - let patches = full_range_patches(); - let mut rng = StdRng::seed_from_u64(11); - let queries: Vec = (0..NUM_QUERIES) - .map(|_| rng.random_range(0..ARRAY_LEN)) - .collect(); - - bencher.bench_local(|| { - for &q in &queries { - std::hint::black_box(patches.search_index(q).unwrap()); - } - }); -} - -#[divan::bench] -fn get_patched_above_max(bencher: Bencher) { - let patches = narrow_band_patches(); - let queries: Vec = (PATCH_HIGH..(PATCH_HIGH + NUM_QUERIES)).collect(); - - bencher.bench_local(|| { - for &q in &queries { - std::hint::black_box(patches.get_patched(q).unwrap()); - } - }); -} From 78b52ff00366b9c188b1bf87edbc198777fd85f8 Mon Sep 17 00:00:00 2001 From: Baris Palaska Date: Tue, 5 May 2026 14:20:55 +0100 Subject: [PATCH 4/7] use stats Signed-off-by: Baris Palaska --- vortex-array/src/patches.rs | 101 +++++++++++++----------------------- 1 file changed, 37 insertions(+), 64 deletions(-) diff --git a/vortex-array/src/patches.rs b/vortex-array/src/patches.rs index 43f267867f7..b3ec34c0783 100644 --- a/vortex-array/src/patches.rs +++ b/vortex-array/src/patches.rs @@ -5,7 +5,6 @@ use std::cmp::Ordering; use std::fmt::Debug; use std::hash::Hash; use std::ops::Range; -use std::sync::OnceLock; use num_traits::NumCast; use vortex_buffer::BitBuffer; @@ -35,6 +34,9 @@ use crate::dtype::Nullability; use crate::dtype::Nullability::NonNullable; use crate::dtype::PType; use crate::dtype::UnsignedPType; +use crate::expr::stats::Precision; +use crate::expr::stats::Stat; +use crate::expr::stats::StatsProvider; use crate::match_each_integer_ptype; use crate::match_each_unsigned_integer_ptype; use crate::scalar::PValue; @@ -147,10 +149,6 @@ pub struct Patches { /// `offset_within_chunk` is necessary in order to keep track of how many /// elements were sliced off within the chunk. offset_within_chunk: Option, - /// Cached `indices[0] - offset`. - min_index: OnceLock, - /// Cached `indices[len - 1] - offset`. - max_index: OnceLock, } impl Patches { @@ -177,29 +175,29 @@ impl Patches { ); vortex_ensure!(!indices.is_empty(), "Patch indices must not be empty"); - let min_index = OnceLock::new(); - let max_index = OnceLock::new(); - // Perform validation of components when they are host-resident. // This is not possible to do eagerly when the data is on GPU memory. if indices.is_host() && values.is_host() { - let max = usize::try_from(&indices.execute_scalar( + let last = indices.execute_scalar( indices.len() - 1, &mut LEGACY_SESSION.create_execution_ctx(), - )?) - .map_err(|_| vortex_err!("indices must be a number"))?; + )?; + let max = + usize::try_from(&last).map_err(|_| vortex_err!("indices must be a number"))?; vortex_ensure!( max - offset < array_len, "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}" ); - // Pre-populate bounds caches for the search_index fast path. - let min = usize::try_from( - &indices.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?, - ) - .map_err(|_| vortex_err!("indices must be a number"))?; - let _ = max_index.set(max - offset); - let _ = min_index.set(min - offset); + // Seed Min/Max stats on indices so search_index can short-circuit + // out-of-range lookups without recomputing them. Indices are + // non-nullable per the validation above, so values are always present. + let first = indices.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?; + if let (Some(min_value), Some(max_value)) = (first.value(), last.value()) { + let stats = indices.statistics(); + stats.set(Stat::Min, Precision::Exact(min_value.clone())); + stats.set(Stat::Max, Precision::Exact(max_value.clone())); + } #[cfg(debug_assertions)] { @@ -221,8 +219,6 @@ impl Patches { chunk_offsets: chunk_offsets.clone(), // Initialize with `Some(0)` only if `chunk_offsets` are set. offset_within_chunk: chunk_offsets.map(|_| 0), - min_index, - max_index, }) } @@ -250,8 +246,6 @@ impl Patches { values, chunk_offsets, offset_within_chunk, - min_index: OnceLock::new(), - max_index: OnceLock::new(), } } @@ -425,14 +419,17 @@ impl Patches { Self::search_index_binary_search(&self.indices, index + self.offset) } - /// Returns the search result if `index` falls outside the cached bounds, - /// or `None` if the bounds are uncached or `index` is in range. + /// Returns the search result if `index` falls outside the indices' Min/Max + /// stats, or `None` if those stats aren't populated or `index` is in range. + /// Reads stats directly without triggering computation. fn search_index_out_of_range(&self, index: usize) -> Option { - let min = *self.min_index.get()?; - let max = *self.max_index.get()?; - if index < min { + let stats = self.indices.statistics(); + let min = usize::try_from(&stats.get(Stat::Min)?.as_exact()?).ok()?; + let max = usize::try_from(&stats.get(Stat::Max)?.as_exact()?).ok()?; + let needle = index + self.offset; + if needle < min { Some(SearchResult::NotFound(0)) - } else if index > max { + } else if needle > max { Some(SearchResult::NotFound(self.indices.len())) } else { None @@ -591,37 +588,24 @@ impl Patches { /// Returns the minimum patch index. pub fn min_index(&self) -> VortexResult { - if let Some(&v) = self.min_index.get() { - return Ok(v); - } - let first = self + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let min: usize = self .indices - .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())? - .as_primitive() - .as_::() - .ok_or_else(|| vortex_err!("index does not fit in usize"))?; - let result = first - self.offset; - let _ = self.min_index.set(result); - Ok(result) + .statistics() + .compute_min(&mut ctx) + .ok_or_else(|| vortex_err!("min index unavailable"))?; + Ok(min - self.offset) } /// Returns the maximum patch index. pub fn max_index(&self) -> VortexResult { - if let Some(&v) = self.max_index.get() { - return Ok(v); - } - let last = self + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let max: usize = self .indices - .execute_scalar( - self.indices.len() - 1, - &mut LEGACY_SESSION.create_execution_ctx(), - )? - .as_primitive() - .as_::() - .ok_or_else(|| vortex_err!("index does not fit in usize"))?; - let result = last - self.offset; - let _ = self.max_index.set(result); - Ok(result) + .statistics() + .compute_max(&mut ctx) + .ok_or_else(|| vortex_err!("max index unavailable"))?; + Ok(max - self.offset) } /// Filter the patches by a mask, resulting in new patches for the filtered array. @@ -696,8 +680,6 @@ impl Patches { // TODO(0ax1): Chunk offsets are invalid after a filter is applied. chunk_offsets: None, offset_within_chunk: self.offset_within_chunk, - min_index: OnceLock::new(), - max_index: OnceLock::new(), })) } @@ -745,8 +727,6 @@ impl Patches { values, chunk_offsets: new_chunk_offsets, offset_within_chunk, - min_index: OnceLock::new(), - max_index: OnceLock::new(), })) } @@ -882,8 +862,6 @@ impl Patches { .take(PrimitiveArray::new(values_indices, values_validity).into_array())?, chunk_offsets: None, offset_within_chunk: Some(0), // Reset when creating new Patches. - min_index: OnceLock::new(), - max_index: OnceLock::new(), })) } @@ -933,8 +911,6 @@ impl Patches { // TODO(0ax1): Chunk offsets are invalid after take is applied. chunk_offsets: None, offset_within_chunk: self.offset_within_chunk, - min_index: OnceLock::new(), - max_index: OnceLock::new(), })) } @@ -958,9 +934,6 @@ impl Patches { values, chunk_offsets: self.chunk_offsets, offset_within_chunk: self.offset_within_chunk, - // indices and offset are preserved, so the cached bounds remain valid. - min_index: self.min_index, - max_index: self.max_index, }) } } From ed818d17cb0593e3c4330ceaf6c4085cc0c75911 Mon Sep 17 00:00:00 2001 From: Baris Palaska Date: Wed, 6 May 2026 11:08:33 +0100 Subject: [PATCH 5/7] still cache bounds Signed-off-by: Baris Palaska --- vortex-array/src/patches.rs | 49 ++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/vortex-array/src/patches.rs b/vortex-array/src/patches.rs index b3ec34c0783..b9f0d6ab877 100644 --- a/vortex-array/src/patches.rs +++ b/vortex-array/src/patches.rs @@ -5,6 +5,7 @@ use std::cmp::Ordering; use std::fmt::Debug; use std::hash::Hash; use std::ops::Range; +use std::sync::OnceLock; use num_traits::NumCast; use vortex_buffer::BitBuffer; @@ -149,6 +150,8 @@ pub struct Patches { /// `offset_within_chunk` is necessary in order to keep track of how many /// elements were sliced off within the chunk. offset_within_chunk: Option, + /// Memoized offset-adjusted bounds; avoids a stats lookup per `search_index` call. + bounds: OnceLock<(usize, usize)>, } impl Patches { @@ -175,6 +178,8 @@ impl Patches { ); vortex_ensure!(!indices.is_empty(), "Patch indices must not be empty"); + let bounds = OnceLock::new(); + // Perform validation of components when they are host-resident. // This is not possible to do eagerly when the data is on GPU memory. if indices.is_host() && values.is_host() { @@ -189,15 +194,18 @@ impl Patches { "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}" ); - // Seed Min/Max stats on indices so search_index can short-circuit - // out-of-range lookups without recomputing them. Indices are - // non-nullable per the validation above, so values are always present. let first = indices.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?; + let min = + usize::try_from(&first).map_err(|_| vortex_err!("indices must be a number"))?; + + // Seed indices stats so any other consumer (pruning, predicate pushdown) + // sees them, and pre-populate the bounds cache for the search_index fast path. if let (Some(min_value), Some(max_value)) = (first.value(), last.value()) { let stats = indices.statistics(); stats.set(Stat::Min, Precision::Exact(min_value.clone())); stats.set(Stat::Max, Precision::Exact(max_value.clone())); } + let _ = bounds.set((min - offset, max - offset)); #[cfg(debug_assertions)] { @@ -219,6 +227,7 @@ impl Patches { chunk_offsets: chunk_offsets.clone(), // Initialize with `Some(0)` only if `chunk_offsets` are set. offset_within_chunk: chunk_offsets.map(|_| 0), + bounds, }) } @@ -246,6 +255,7 @@ impl Patches { values, chunk_offsets, offset_within_chunk, + bounds: OnceLock::new(), } } @@ -419,23 +429,32 @@ impl Patches { Self::search_index_binary_search(&self.indices, index + self.offset) } - /// Returns the search result if `index` falls outside the indices' Min/Max - /// stats, or `None` if those stats aren't populated or `index` is in range. - /// Reads stats directly without triggering computation. + #[inline] fn search_index_out_of_range(&self, index: usize) -> Option { - let stats = self.indices.statistics(); - let min = usize::try_from(&stats.get(Stat::Min)?.as_exact()?).ok()?; - let max = usize::try_from(&stats.get(Stat::Max)?.as_exact()?).ok()?; - let needle = index + self.offset; - if needle < min { + let (min, max) = self.cached_bounds()?; + if index < min { Some(SearchResult::NotFound(0)) - } else if needle > max { + } else if index > max { Some(SearchResult::NotFound(self.indices.len())) } else { None } } + #[inline] + fn cached_bounds(&self) -> Option<(usize, usize)> { + if let Some(&b) = self.bounds.get() { + return Some(b); + } + let stats = self.indices.statistics(); + let raw_min = usize::try_from(&stats.get(Stat::Min)?.as_exact()?).ok()?; + let raw_max = usize::try_from(&stats.get(Stat::Max)?.as_exact()?).ok()?; + let min = raw_min.checked_sub(self.offset)?; + let max = raw_max.checked_sub(self.offset)?; + let _ = self.bounds.set((min, max)); + Some((min, max)) + } + /// Binary searches for `needle` in the indices array. /// /// # Returns @@ -680,6 +699,7 @@ impl Patches { // TODO(0ax1): Chunk offsets are invalid after a filter is applied. chunk_offsets: None, offset_within_chunk: self.offset_within_chunk, + bounds: OnceLock::new(), })) } @@ -727,6 +747,7 @@ impl Patches { values, chunk_offsets: new_chunk_offsets, offset_within_chunk, + bounds: OnceLock::new(), })) } @@ -862,6 +883,7 @@ impl Patches { .take(PrimitiveArray::new(values_indices, values_validity).into_array())?, chunk_offsets: None, offset_within_chunk: Some(0), // Reset when creating new Patches. + bounds: OnceLock::new(), })) } @@ -911,6 +933,7 @@ impl Patches { // TODO(0ax1): Chunk offsets are invalid after take is applied. chunk_offsets: None, offset_within_chunk: self.offset_within_chunk, + bounds: OnceLock::new(), })) } @@ -934,6 +957,8 @@ impl Patches { values, chunk_offsets: self.chunk_offsets, offset_within_chunk: self.offset_within_chunk, + // indices and offset are preserved, so cached bounds remain valid. + bounds: self.bounds, }) } } From c4d519567a125c04bfd7c25d2b48a8a4119e313b Mon Sep 17 00:00:00 2001 From: Baris Palaska Date: Wed, 6 May 2026 11:59:50 +0100 Subject: [PATCH 6/7] min and max index use cached bounds Signed-off-by: Baris Palaska --- vortex-array/src/patches.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/vortex-array/src/patches.rs b/vortex-array/src/patches.rs index b9f0d6ab877..00920a1d440 100644 --- a/vortex-array/src/patches.rs +++ b/vortex-array/src/patches.rs @@ -607,24 +607,32 @@ impl Patches { /// Returns the minimum patch index. pub fn min_index(&self) -> VortexResult { + if let Some((min, _)) = self.cached_bounds() { + return Ok(min); + } let mut ctx = LEGACY_SESSION.create_execution_ctx(); - let min: usize = self + let raw: usize = self .indices .statistics() .compute_min(&mut ctx) .ok_or_else(|| vortex_err!("min index unavailable"))?; - Ok(min - self.offset) + raw.checked_sub(self.offset) + .ok_or_else(|| vortex_err!("offset {} exceeds min index {}", self.offset, raw)) } /// Returns the maximum patch index. pub fn max_index(&self) -> VortexResult { + if let Some((_, max)) = self.cached_bounds() { + return Ok(max); + } let mut ctx = LEGACY_SESSION.create_execution_ctx(); - let max: usize = self + let raw: usize = self .indices .statistics() .compute_max(&mut ctx) .ok_or_else(|| vortex_err!("max index unavailable"))?; - Ok(max - self.offset) + raw.checked_sub(self.offset) + .ok_or_else(|| vortex_err!("offset {} exceeds max index {}", self.offset, raw)) } /// Filter the patches by a mask, resulting in new patches for the filtered array. From f99eabf17f47ea5446f43198fd25f06c233fb03a Mon Sep 17 00:00:00 2001 From: Baris Palaska Date: Wed, 6 May 2026 16:23:23 +0100 Subject: [PATCH 7/7] arcswap + scalarref Signed-off-by: Baris Palaska --- vortex-array/src/patches.rs | 39 ++++----------- vortex-array/src/stats/array.rs | 77 +++++++++++++++++------------ vortex-array/src/stats/stats_set.rs | 15 ++++++ 3 files changed, 69 insertions(+), 62 deletions(-) diff --git a/vortex-array/src/patches.rs b/vortex-array/src/patches.rs index 00920a1d440..fc736afa6dc 100644 --- a/vortex-array/src/patches.rs +++ b/vortex-array/src/patches.rs @@ -5,7 +5,6 @@ use std::cmp::Ordering; use std::fmt::Debug; use std::hash::Hash; use std::ops::Range; -use std::sync::OnceLock; use num_traits::NumCast; use vortex_buffer::BitBuffer; @@ -37,7 +36,6 @@ use crate::dtype::PType; use crate::dtype::UnsignedPType; use crate::expr::stats::Precision; use crate::expr::stats::Stat; -use crate::expr::stats::StatsProvider; use crate::match_each_integer_ptype; use crate::match_each_unsigned_integer_ptype; use crate::scalar::PValue; @@ -150,8 +148,6 @@ pub struct Patches { /// `offset_within_chunk` is necessary in order to keep track of how many /// elements were sliced off within the chunk. offset_within_chunk: Option, - /// Memoized offset-adjusted bounds; avoids a stats lookup per `search_index` call. - bounds: OnceLock<(usize, usize)>, } impl Patches { @@ -178,8 +174,6 @@ impl Patches { ); vortex_ensure!(!indices.is_empty(), "Patch indices must not be empty"); - let bounds = OnceLock::new(); - // Perform validation of components when they are host-resident. // This is not possible to do eagerly when the data is on GPU memory. if indices.is_host() && values.is_host() { @@ -194,18 +188,15 @@ impl Patches { "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}" ); + // Seed Min/Max stats on indices so search_index can short-circuit + // out-of-range lookups, and so pruning/predicate-pushdown consumers + // see populated bounds. let first = indices.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?; - let min = - usize::try_from(&first).map_err(|_| vortex_err!("indices must be a number"))?; - - // Seed indices stats so any other consumer (pruning, predicate pushdown) - // sees them, and pre-populate the bounds cache for the search_index fast path. if let (Some(min_value), Some(max_value)) = (first.value(), last.value()) { let stats = indices.statistics(); stats.set(Stat::Min, Precision::Exact(min_value.clone())); stats.set(Stat::Max, Precision::Exact(max_value.clone())); } - let _ = bounds.set((min - offset, max - offset)); #[cfg(debug_assertions)] { @@ -227,7 +218,6 @@ impl Patches { chunk_offsets: chunk_offsets.clone(), // Initialize with `Some(0)` only if `chunk_offsets` are set. offset_within_chunk: chunk_offsets.map(|_| 0), - bounds, }) } @@ -255,7 +245,6 @@ impl Patches { values, chunk_offsets, offset_within_chunk, - bounds: OnceLock::new(), } } @@ -443,16 +432,12 @@ impl Patches { #[inline] fn cached_bounds(&self) -> Option<(usize, usize)> { - if let Some(&b) = self.bounds.get() { - return Some(b); - } - let stats = self.indices.statistics(); - let raw_min = usize::try_from(&stats.get(Stat::Min)?.as_exact()?).ok()?; - let raw_max = usize::try_from(&stats.get(Stat::Max)?.as_exact()?).ok()?; - let min = raw_min.checked_sub(self.offset)?; - let max = raw_max.checked_sub(self.offset)?; - let _ = self.bounds.set((min, max)); - Some((min, max)) + let offset = self.offset; + self.indices.statistics().with_typed_stats_set(|typed| { + let raw_min = usize::try_from(typed.get_value(Stat::Min)?.as_exact()?).ok()?; + let raw_max = usize::try_from(typed.get_value(Stat::Max)?.as_exact()?).ok()?; + Some((raw_min.checked_sub(offset)?, raw_max.checked_sub(offset)?)) + }) } /// Binary searches for `needle` in the indices array. @@ -707,7 +692,6 @@ impl Patches { // TODO(0ax1): Chunk offsets are invalid after a filter is applied. chunk_offsets: None, offset_within_chunk: self.offset_within_chunk, - bounds: OnceLock::new(), })) } @@ -755,7 +739,6 @@ impl Patches { values, chunk_offsets: new_chunk_offsets, offset_within_chunk, - bounds: OnceLock::new(), })) } @@ -891,7 +874,6 @@ impl Patches { .take(PrimitiveArray::new(values_indices, values_validity).into_array())?, chunk_offsets: None, offset_within_chunk: Some(0), // Reset when creating new Patches. - bounds: OnceLock::new(), })) } @@ -941,7 +923,6 @@ impl Patches { // TODO(0ax1): Chunk offsets are invalid after take is applied. chunk_offsets: None, offset_within_chunk: self.offset_within_chunk, - bounds: OnceLock::new(), })) } @@ -965,8 +946,6 @@ impl Patches { values, chunk_offsets: self.chunk_offsets, offset_within_chunk: self.offset_within_chunk, - // indices and offset are preserved, so cached bounds remain valid. - bounds: self.bounds, }) } } diff --git a/vortex-array/src/stats/array.rs b/vortex-array/src/stats/array.rs index 248ca3587f2..a7fcf843f8f 100644 --- a/vortex-array/src/stats/array.rs +++ b/vortex-array/src/stats/array.rs @@ -5,7 +5,7 @@ use std::sync::Arc; -use parking_lot::RwLock; +use arc_swap::ArcSwap; use vortex_array::ExecutionCtx; use vortex_error::VortexError; use vortex_error::VortexResult; @@ -32,9 +32,20 @@ use crate::scalar::ScalarValue; /// A shared [`StatsSet`] stored in an array. Can be shared by copies of the array and can also be mutated in place. // TODO(adamg): This is a very bad name. -#[derive(Clone, Default, Debug)] +#[derive(Clone, Debug)] pub struct ArrayStats { - inner: Arc>, + // Lock-free reads via copy-on-write. Writes are last-writer-wins; + // concurrent writers may lose updates, which is acceptable for stats + // (they're hints and can be recomputed). + inner: Arc>, +} + +impl Default for ArrayStats { + fn default() -> Self { + Self { + inner: Arc::new(ArcSwap::from_pointee(StatsSet::default())), + } + } } /// Reference to an array's [`StatsSet`]. Can be used to get and mutate the underlying stats. @@ -55,42 +66,49 @@ impl ArrayStats { } pub fn set(&self, stat: Stat, value: Precision) { - self.inner.write().set(stat, value); + let mut new_stats = (**self.inner.load()).clone(); + new_stats.set(stat, value); + self.inner.store(Arc::new(new_stats)); } pub fn clear(&self, stat: Stat) { - self.inner.write().clear(stat); + let mut new_stats = (**self.inner.load()).clone(); + new_stats.clear(stat); + self.inner.store(Arc::new(new_stats)); } pub fn retain(&self, stats: &[Stat]) { - self.inner.write().retain_only(stats); + let mut new_stats = (**self.inner.load()).clone(); + new_stats.retain_only(stats); + self.inner.store(Arc::new(new_stats)); } } impl From for ArrayStats { fn from(value: StatsSet) -> Self { Self { - inner: Arc::new(RwLock::new(value)), + inner: Arc::new(ArcSwap::from_pointee(value)), } } } impl From for StatsSet { fn from(value: ArrayStats) -> Self { - value.inner.read().clone() + (**value.inner.load()).clone() } } impl StatsSetRef<'_> { pub(crate) fn replace(&self, stats: StatsSet) { - *self.array_stats.inner.write() = stats; + self.array_stats.inner.store(Arc::new(stats)); } pub fn set_iter(&self, iter: StatsSetIntoIter) { - let mut guard = self.array_stats.inner.write(); + let mut new_stats = (**self.array_stats.inner.load()).clone(); for (stat, value) in iter { - guard.set(stat, value); + new_stats.set(stat, value); } + self.array_stats.inner.store(Arc::new(new_stats)); } pub fn inherit_from(&self, stats: StatsSetRef<'_>) { @@ -101,38 +119,33 @@ impl StatsSetRef<'_> { } pub fn inherit<'a>(&self, iter: impl Iterator)>) { - let mut guard = self.array_stats.inner.write(); + let mut new_stats = (**self.array_stats.inner.load()).clone(); for (stat, value) in iter { if !value.is_exact() { - if !guard.get(*stat).is_some_and(|v| v.is_exact()) { - guard.set(*stat, value.clone()); + if !new_stats.get(*stat).is_some_and(|v| v.is_exact()) { + new_stats.set(*stat, value.clone()); } } else { - guard.set(*stat, value.clone()); + new_stats.set(*stat, value.clone()); } } + self.array_stats.inner.store(Arc::new(new_stats)); } pub fn with_typed_stats_set U>(&self, apply: F) -> U { - apply( - self.array_stats - .inner - .read() - .as_typed_ref(self.dyn_array_ref.dtype()), - ) + let snapshot = self.array_stats.inner.load(); + apply(snapshot.as_typed_ref(self.dyn_array_ref.dtype())) } pub fn with_mut_typed_stats_set U>(&self, apply: F) -> U { - apply( - self.array_stats - .inner - .write() - .as_mut_typed_ref(self.dyn_array_ref.dtype()), - ) + let mut new_stats = (**self.array_stats.inner.load()).clone(); + let result = apply(new_stats.as_mut_typed_ref(self.dyn_array_ref.dtype())); + self.array_stats.inner.store(Arc::new(new_stats)); + result } pub fn to_owned(&self) -> StatsSet { - self.array_stats.inner.read().clone() + (**self.array_stats.inner.load()).clone() } /// Returns a clone of the underlying [`ArrayStats`]. @@ -149,8 +162,8 @@ impl StatsSetRef<'_> { &self, f: F, ) -> R { - let lock = self.array_stats.inner.read(); - f(&mut lock.iter()) + let snapshot = self.array_stats.inner.load(); + f(&mut snapshot.iter()) } pub fn compute_stat(&self, stat: Stat, ctx: &mut ExecutionCtx) -> VortexResult> { @@ -288,12 +301,12 @@ impl StatsProvider for StatsSetRef<'_> { fn get(&self, stat: Stat) -> Option> { self.array_stats .inner - .read() + .load() .as_typed_ref(self.dyn_array_ref.dtype()) .get(stat) } fn len(&self) -> usize { - self.array_stats.inner.read().len() + self.array_stats.inner.load().len() } } diff --git a/vortex-array/src/stats/stats_set.rs b/vortex-array/src/stats/stats_set.rs index a0ca186fed6..b82b4a077a8 100644 --- a/vortex-array/src/stats/stats_set.rs +++ b/vortex-array/src/stats/stats_set.rs @@ -114,6 +114,14 @@ impl StatsSet { .map(|(_, v)| v.clone()) } + /// Borrow the value for a given stat without cloning the underlying `ScalarValue`. + pub fn get_value(&self, stat: Stat) -> Option<&Precision> { + self.values + .iter() + .find(|(s, _)| *s == stat) + .map(|(_, v)| v) + } + /// Length of the stats set pub fn len(&self) -> usize { self.values.len() @@ -225,6 +233,13 @@ pub struct TypedStatsSetRef<'a, 'b> { pub dtype: &'b DType, } +impl<'a, 'b> TypedStatsSetRef<'a, 'b> { + /// Borrow the value for a given stat without constructing a [`Scalar`]. + pub fn get_value(&self, stat: Stat) -> Option> { + self.values.get_value(stat).map(|p| p.as_ref()) + } +} + impl StatsProvider for TypedStatsSetRef<'_, '_> { fn get(&self, stat: Stat) -> Option> { self.values.get(stat).map(|p| {