From 500d0a7bb2bdcf4832e92444b37466b610c6ab0b Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 24 Jun 2026 10:48:19 -0400 Subject: [PATCH] Add OnPair LIKE pushdown Signed-off-by: Nicholas Gates --- .../experimental/onpair/src/compute/like.rs | 520 ++++++++++++++++++ .../experimental/onpair/src/compute/mod.rs | 1 + encodings/experimental/onpair/src/kernel.rs | 3 + vortex-array/src/arrays/dict/compute/rules.rs | 107 ++++ vortex-array/src/arrays/dict/vtable/kernel.rs | 2 + vortex-array/src/arrays/filter/kernel.rs | 9 +- vortex-array/src/arrays/shared/vtable.rs | 8 + vortex-array/src/executor.rs | 31 ++ vortex-array/src/scalar_fn/fns/like/kernel.rs | 45 ++ vortex-file/src/strategy.rs | 13 +- 10 files changed, 735 insertions(+), 4 deletions(-) create mode 100644 encodings/experimental/onpair/src/compute/like.rs diff --git a/encodings/experimental/onpair/src/compute/like.rs b/encodings/experimental/onpair/src/compute/like.rs new file mode 100644 index 00000000000..84925e18d69 --- /dev/null +++ b/encodings/experimental/onpair/src/compute/like.rs @@ -0,0 +1,520 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use memchr::memmem::Finder; +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::BoolArray; +use vortex_array::scalar_fn::fns::like::LikeKernel; +use vortex_array::scalar_fn::fns::like::LikeOptions; +use vortex_buffer::BitBuffer; +use vortex_error::VortexResult; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; + +use crate::OnPair; +use crate::OnPairArrayExt; +use crate::OnPairArraySlotsExt; +use crate::decode::code_boundary_at; +use crate::decode::collect_widened; + +#[derive(Clone, Copy)] +enum SimpleLike<'a> { + All, + Exact(&'a [u8]), + Prefix(&'a [u8]), + Suffix(&'a [u8]), + Contains(&'a [u8]), +} + +impl LikeKernel for OnPair { + fn like( + array: ArrayView<'_, Self>, + pattern: &ArrayRef, + options: LikeOptions, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + let Some(pattern_scalar) = pattern.as_constant() else { + return Ok(None); + }; + if options.case_insensitive { + return Ok(None); + } + + let pattern_bytes: &[u8] = if let Some(s) = pattern_scalar.as_utf8_opt() { + let Some(v) = s.value() else { + return Ok(None); + }; + v.as_ref() + } else if let Some(b) = pattern_scalar.as_binary_opt() { + let Some(v) = b.value() else { + return Ok(None); + }; + v + } else { + return Ok(None); + }; + let Some(parsed) = parse_simple_like(pattern_bytes) else { + return Ok(None); + }; + + let codes_offsets = array.codes_offsets(); + let code_start = code_boundary_at(codes_offsets, 0, ctx)?; + let code_end = code_boundary_at(codes_offsets, array.len(), ctx)?; + vortex_ensure!( + code_start <= code_end, + "OnPair codes_offsets must be nondecreasing" + ); + vortex_ensure!( + code_end <= array.codes().len(), + "OnPair codes_offsets end {} exceeds codes len {}", + code_end, + array.codes().len() + ); + + let codes = collect_widened::(&array.codes().slice(code_start..code_end)?, ctx)?; + let code_offsets = normalize_code_offsets( + collect_widened::(codes_offsets, ctx)?.as_slice(), + code_start, + code_end, + )?; + let dict_offsets = collect_widened::(array.dict_offsets(), ctx)?; + let dict_bytes = array.dict_bytes(); + let dict_bytes = dict_bytes.as_slice(); + let mut tail = Vec::new(); + let mut scratch = Vec::new(); + + let bits = match parsed { + SimpleLike::All => BitBuffer::collect_bool(array.len(), |_| true ^ options.negated), + SimpleLike::Exact(needle) => BitBuffer::collect_bool(array.len(), |row| { + row_matches_exact( + row_codes(&code_offsets, &codes, row), + dict_bytes, + dict_offsets.as_slice(), + needle, + ) ^ options.negated + }), + SimpleLike::Prefix(needle) => BitBuffer::collect_bool(array.len(), |row| { + row_matches_prefix( + row_codes(&code_offsets, &codes, row), + dict_bytes, + dict_offsets.as_slice(), + needle, + ) ^ options.negated + }), + SimpleLike::Suffix(needle) => BitBuffer::collect_bool(array.len(), |row| { + row_matches_suffix( + row_codes(&code_offsets, &codes, row), + dict_bytes, + dict_offsets.as_slice(), + needle, + &mut tail, + ) ^ options.negated + }), + SimpleLike::Contains(needle) => { + let finder = Finder::new(needle); + BitBuffer::collect_bool(array.len(), |row| { + row_matches_contains( + row_codes(&code_offsets, &codes, row), + dict_bytes, + dict_offsets.as_slice(), + needle, + &finder, + &mut tail, + &mut scratch, + ) ^ options.negated + }) + } + }; + + let validity = array + .array_validity() + .union_nullability(pattern_scalar.dtype().nullability()); + Ok(Some(BoolArray::new(bits, validity).into_array())) + } +} + +fn normalize_code_offsets( + code_offsets: &[u64], + code_start: usize, + code_end: usize, +) -> VortexResult> { + let offsets = code_offsets + .iter() + .map(|&offset| { + usize::try_from(offset) + .map_err(|_| vortex_err!("OnPair code offset {} exceeds usize", offset)) + }) + .collect::>>()?; + + for &offset in &offsets { + vortex_ensure!( + offset >= code_start && offset <= code_end, + "OnPair codes offset {} outside row window {}..{}", + offset, + code_start, + code_end + ); + } + for window in offsets.windows(2) { + vortex_ensure!( + window[0] <= window[1], + "OnPair codes_offsets must be nondecreasing" + ); + } + + Ok(offsets + .into_iter() + .map(|offset| offset - code_start) + .collect()) +} + +fn parse_simple_like(pattern: &[u8]) -> Option> { + if pattern.is_empty() { + return Some(SimpleLike::Exact(b"")); + } + if pattern.iter().any(|&b| matches!(b, b'_' | b'\\')) { + return None; + } + + let Some(first_literal) = pattern.iter().position(|&b| b != b'%') else { + return Some(SimpleLike::All); + }; + let last_literal = pattern.iter().rposition(|&b| b != b'%')? + 1; + let literal = &pattern[first_literal..last_literal]; + if literal.contains(&b'%') { + return None; + } + + match (first_literal == 0, last_literal == pattern.len()) { + (true, true) => Some(SimpleLike::Exact(literal)), + (true, false) => Some(SimpleLike::Prefix(literal)), + (false, true) => Some(SimpleLike::Suffix(literal)), + (false, false) => Some(SimpleLike::Contains(literal)), + } +} + +fn row_codes<'a>(code_offsets: &[usize], codes: &'a [u16], row: usize) -> &'a [u16] { + let start = code_offsets[row]; + let end = code_offsets[row + 1]; + &codes[start..end] +} + +fn token_bytes<'a>(dict_bytes: &'a [u8], dict_offsets: &[u32], code: u16) -> &'a [u8] { + let code = usize::from(code); + let start = dict_offsets[code] as usize; + let end = dict_offsets[code + 1] as usize; + &dict_bytes[start..end] +} + +fn row_matches_exact( + codes: &[u16], + dict_bytes: &[u8], + dict_offsets: &[u32], + needle: &[u8], +) -> bool { + let mut matched = 0; + for &code in codes { + let token = token_bytes(dict_bytes, dict_offsets, code); + if matched + token.len() > needle.len() { + return false; + } + if token != &needle[matched..matched + token.len()] { + return false; + } + matched += token.len(); + } + matched == needle.len() +} + +fn row_matches_prefix( + codes: &[u16], + dict_bytes: &[u8], + dict_offsets: &[u32], + needle: &[u8], +) -> bool { + if needle.is_empty() { + return true; + } + + let mut matched = 0; + for &code in codes { + let token = token_bytes(dict_bytes, dict_offsets, code); + let take = (needle.len() - matched).min(token.len()); + if token[..take] != needle[matched..matched + take] { + return false; + } + matched += take; + if matched == needle.len() { + return true; + } + } + false +} + +fn row_matches_suffix( + codes: &[u16], + dict_bytes: &[u8], + dict_offsets: &[u32], + needle: &[u8], + tail: &mut Vec, +) -> bool { + if needle.is_empty() { + return true; + } + + let mut total_len = 0; + tail.clear(); + for &code in codes { + let token = token_bytes(dict_bytes, dict_offsets, code); + total_len += token.len(); + append_tail(tail, token, needle.len()); + } + total_len >= needle.len() && tail.as_slice() == needle +} + +fn row_matches_contains( + codes: &[u16], + dict_bytes: &[u8], + dict_offsets: &[u32], + needle: &[u8], + finder: &Finder<'_>, + tail: &mut Vec, + scratch: &mut Vec, +) -> bool { + if needle.is_empty() { + return true; + } + + tail.clear(); + for &code in codes { + let token = token_bytes(dict_bytes, dict_offsets, code); + if finder.find(token).is_some() { + return true; + } + if !tail.is_empty() { + scratch.clear(); + scratch.extend_from_slice(tail); + scratch.extend_from_slice(token); + if finder.find(scratch).is_some() { + return true; + } + } + append_tail(tail, token, needle.len() - 1); + } + false +} + +fn append_tail(tail: &mut Vec, bytes: &[u8], max_len: usize) { + if max_len == 0 { + return; + } + if bytes.len() >= max_len { + tail.clear(); + tail.extend_from_slice(&bytes[bytes.len() - max_len..]); + return; + } + let overflow = tail.len() + bytes.len(); + if overflow > max_len { + tail.drain(..overflow - max_len); + } + tail.extend_from_slice(bytes); +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::ArrayRef; + use vortex_array::Canonical; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::BoolArray; + use vortex_array::arrays::ConstantArray; + use vortex_array::arrays::ScalarFn; + use vortex_array::arrays::SharedArray; + use vortex_array::arrays::VarBinArray; + use vortex_array::arrays::scalar_fn::ScalarFnFactoryExt; + use vortex_array::assert_arrays_eq; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::scalar_fn::fns::like::Like; + use vortex_array::scalar_fn::fns::like::LikeOptions; + use vortex_error::VortexResult; + use vortex_mask::Mask; + use vortex_session::VortexSession; + + use crate::OnPair; + use crate::compress::DEFAULT_DICT12_CONFIG; + use crate::compress::onpair_compress; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = vortex_array::array_session(); + crate::initialize(&session); + session + }); + + fn run_like( + values: &[Option<&str>], + pattern: &str, + options: LikeOptions, + ) -> VortexResult { + let input = + VarBinArray::from_iter(values.iter().copied(), DType::Utf8(Nullability::Nullable)); + let len = input.len(); + let dtype = input.dtype().clone(); + let array = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG)?.into_array(); + let pattern = ConstantArray::new(pattern, len).into_array(); + let result = Like + .try_new_array(len, options, [array, pattern])? + .into_array() + .execute::(&mut SESSION.create_execution_ctx())? + .into_bool(); + Ok(result) + } + + #[test] + fn like_contains() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let result = run_like( + &[ + Some("https://google.example"), + Some("no match"), + Some("prefix Google suffix"), + None, + ], + "%Google%", + LikeOptions::default(), + )?; + assert_arrays_eq!( + &result, + &BoolArray::from_iter([Some(false), Some(false), Some(true), None]), + &mut ctx + ); + Ok(()) + } + + #[test] + fn like_prefix_suffix_exact_and_negated() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let values = [ + Some("2020-10-01"), + Some("2020-11-01"), + Some("x-2020-10-01"), + Some(""), + ]; + assert_arrays_eq!( + &run_like(&values, "2020-10-%", LikeOptions::default())?, + &BoolArray::from_iter([Some(true), Some(false), Some(false), Some(false)]), + &mut ctx + ); + assert_arrays_eq!( + &run_like(&values, "%-01", LikeOptions::default())?, + &BoolArray::from_iter([Some(true), Some(true), Some(true), Some(false)]), + &mut ctx + ); + assert_arrays_eq!( + &run_like(&values, "2020-10-01", LikeOptions::default())?, + &BoolArray::from_iter([Some(true), Some(false), Some(false), Some(false)]), + &mut ctx + ); + assert_arrays_eq!( + &run_like( + &values, + "%2020%", + LikeOptions { + negated: true, + case_insensitive: false, + }, + )?, + &BoolArray::from_iter([Some(false), Some(false), Some(false), Some(true)]), + &mut ctx + ); + Ok(()) + } + + #[test] + fn like_filtered_onpair_stays_lazy_after_one_step() -> VortexResult<()> { + let input = VarBinArray::from_iter( + [ + Some("Google alpha"), + Some("irrelevant"), + Some("Google beta"), + Some("other"), + ], + DType::Utf8(Nullability::Nullable), + ); + let len = input.len(); + let dtype = input.dtype().clone(); + let array = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG)?.into_array(); + let filtered = array.filter(Mask::from_iter([true, false, true, false]))?; + let pattern = ConstantArray::new("%Google%", filtered.len()).into_array(); + let like = Like + .try_new_array(filtered.len(), LikeOptions::default(), [filtered, pattern])? + .into_array(); + + let stepped = like.execute::(&mut SESSION.create_execution_ctx())?; + assert!(stepped.is::()); + assert!(stepped.children()[0].is::()); + + let result = stepped + .execute::(&mut SESSION.create_execution_ctx())? + .into_bool(); + let mut ctx = SESSION.create_execution_ctx(); + assert_arrays_eq!( + &result, + &BoolArray::from_iter([Some(true), Some(true)]), + &mut ctx + ); + Ok(()) + } + + #[test] + fn filter_shared_onpair_stays_encoded_after_one_step() -> VortexResult<()> { + let input = VarBinArray::from_iter( + [ + Some("Google alpha"), + Some("irrelevant"), + Some("Google beta"), + Some("other"), + ], + DType::Utf8(Nullability::Nullable), + ); + let len = input.len(); + let dtype = input.dtype().clone(); + let array = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG)?.into_array(); + let shared = SharedArray::new(array).into_array(); + let filtered = shared.filter(Mask::from_iter([true, false, true, false]))?; + + let stepped = filtered.execute::(&mut SESSION.create_execution_ctx())?; + assert!(stepped.is::()); + assert_eq!(stepped.len(), 2); + Ok(()) + } + + #[test] + fn filter_sliced_onpair_stays_encoded_after_one_step() -> VortexResult<()> { + let input = VarBinArray::from_iter( + [ + Some("prefix"), + Some("Google alpha"), + Some("irrelevant"), + Some("Google beta"), + Some("suffix"), + ], + DType::Utf8(Nullability::Nullable), + ); + let len = input.len(); + let dtype = input.dtype().clone(); + let array = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG)?.into_array(); + let sliced = array.slice(1..4)?; + let filtered = sliced.filter(Mask::from_iter([true, false, true]))?; + + let stepped = filtered.execute::(&mut SESSION.create_execution_ctx())?; + assert!(stepped.is::()); + assert_eq!(stepped.len(), 2); + Ok(()) + } +} diff --git a/encodings/experimental/onpair/src/compute/mod.rs b/encodings/experimental/onpair/src/compute/mod.rs index 4ad5f48f578..46cf8bf8bab 100644 --- a/encodings/experimental/onpair/src/compute/mod.rs +++ b/encodings/experimental/onpair/src/compute/mod.rs @@ -5,4 +5,5 @@ mod byte_length; mod cast; mod compare; mod filter; +mod like; mod slice; diff --git a/encodings/experimental/onpair/src/kernel.rs b/encodings/experimental/onpair/src/kernel.rs index 8863d750a72..ed216bfa904 100644 --- a/encodings/experimental/onpair/src/kernel.rs +++ b/encodings/experimental/onpair/src/kernel.rs @@ -10,6 +10,8 @@ use vortex_array::scalar_fn::fns::binary::Binary; use vortex_array::scalar_fn::fns::binary::CompareExecuteAdaptor; use vortex_array::scalar_fn::fns::byte_length::ByteLength; use vortex_array::scalar_fn::fns::byte_length::ByteLengthExecuteAdaptor; +use vortex_array::scalar_fn::fns::like::Like; +use vortex_array::scalar_fn::fns::like::LikeExecuteAdaptor; use vortex_session::VortexSession; use crate::OnPair; @@ -24,4 +26,5 @@ pub(super) fn initialize(session: &VortexSession) { OnPair, ByteLengthExecuteAdaptor(OnPair), ); + kernels.register_execute_parent_kernel(Like.id(), OnPair, LikeExecuteAdaptor(OnPair)); } diff --git a/vortex-array/src/arrays/dict/compute/rules.rs b/vortex-array/src/arrays/dict/compute/rules.rs index b4804218e37..e4d102ee861 100644 --- a/vortex-array/src/arrays/dict/compute/rules.rs +++ b/vortex-array/src/arrays/dict/compute/rules.rs @@ -9,12 +9,15 @@ use crate::EqMode; use crate::IntoArray; use crate::array::ArrayView; use crate::array::VTable; +use crate::arrays::Chunked; +use crate::arrays::ChunkedArray; use crate::arrays::Constant; use crate::arrays::ConstantArray; use crate::arrays::Dict; use crate::arrays::DictArray; use crate::arrays::ScalarFn; use crate::arrays::ScalarFnArray; +use crate::arrays::chunked::ChunkedArrayExt; use crate::arrays::dict::DictArrayExt; use crate::arrays::dict::DictArraySlotsExt; use crate::arrays::filter::FilterReduceAdaptor; @@ -37,11 +40,59 @@ pub(crate) const PARENT_RULES: ParentRuleSet = ParentRuleSet::new(&[ ParentRuleSet::lift(&CastReduceAdaptor(Dict)), ParentRuleSet::lift(&MaskReduceAdaptor(Dict)), ParentRuleSet::lift(&LikeReduceAdaptor(Dict)), + ParentRuleSet::lift(&DictionaryChunkedValuesPullUpRule), ParentRuleSet::lift(&DictionaryScalarFnValuesPushDownRule), ParentRuleSet::lift(&DictionaryScalarFnCodesPullUpRule), ParentRuleSet::lift(&SliceReduceAdaptor(Dict)), ]); +/// Pull a common dictionary values array above chunked dictionary codes. +/// +/// Rewrites `Chunked>` into `Dict, values>` only when +/// every child dictionary shares the exact same values array allocation. +#[derive(Debug)] +struct DictionaryChunkedValuesPullUpRule; + +impl ArrayParentReduceRule for DictionaryChunkedValuesPullUpRule { + type Parent = Chunked; + + fn reduce_parent( + &self, + array: ArrayView<'_, Dict>, + parent: ArrayView<'_, Chunked>, + _child_idx: usize, + ) -> VortexResult> { + let values = array.values(); + let codes_dtype = array.codes().dtype().clone(); + let mut code_chunks = Vec::with_capacity(parent.nchunks()); + let mut all_values_referenced = array.has_all_values_referenced(); + + for chunk in parent.iter_chunks() { + let Some(dict) = chunk.as_opt::() else { + return Ok(None); + }; + if dict.codes().dtype() != &codes_dtype { + return Ok(None); + } + if !ArrayRef::ptr_eq(dict.values(), values) { + return Ok(None); + } + all_values_referenced |= dict.has_all_values_referenced(); + code_chunks.push(dict.codes().clone()); + } + + let codes = ChunkedArray::try_new(code_chunks, codes_dtype)?.into_array(); + let dict = DictArray::try_new(codes, values.clone())?; + let dict = if all_values_referenced { + unsafe { dict.set_all_values_referenced(true) } + } else { + dict + }; + + Ok(Some(dict.into_array())) + } +} + /// Push down a scalar function to run only over the values of a dictionary array. #[derive(Debug)] struct DictionaryScalarFnValuesPushDownRule; @@ -214,16 +265,72 @@ mod tests { use vortex_buffer::buffer; use vortex_error::VortexResult; + use crate::ArrayRef; use crate::IntoArray; use crate::arrays::BoolArray; + use crate::arrays::Chunked; + use crate::arrays::ChunkedArray; use crate::arrays::Dict; use crate::arrays::DictArray; + use crate::arrays::PrimitiveArray; + use crate::arrays::chunked::ChunkedArrayExt; use crate::arrays::dict::DictArrayExt; + use crate::arrays::dict::DictArraySlotsExt; use crate::arrays::scalar_fn::ScalarFnFactoryExt; + use crate::assert_arrays_eq; + use crate::executor::VortexSessionExecute; use crate::optimizer::ArrayOptimizer; use crate::scalar_fn::EmptyOptions; use crate::scalar_fn::fns::not::Not; + #[test] + fn chunked_dict_with_shared_values_pulls_values_up() -> VortexResult<()> { + let values = buffer![10u32, 20, 30].into_array(); + let chunk0 = DictArray::try_new(buffer![0u8, 1].into_array(), values.clone())?.into_array(); + let chunk1 = + DictArray::try_new(buffer![2u8, 0, 1].into_array(), values.clone())?.into_array(); + let array = + ChunkedArray::try_new(vec![chunk0, chunk1], values.dtype().clone())?.into_array(); + + let optimized = array.optimize()?; + let dict = optimized.as_::(); + let codes = dict.codes().as_::(); + + assert!(ArrayRef::ptr_eq(dict.values(), &values)); + assert_eq!(codes.nchunks(), 2); + let mut ctx = crate::LEGACY_SESSION.create_execution_ctx(); + assert_arrays_eq!( + optimized, + PrimitiveArray::from_iter([10u32, 20, 30, 10, 20]), + &mut ctx + ); + + Ok(()) + } + + #[test] + fn chunked_dict_with_distinct_values_stays_chunked() -> VortexResult<()> { + let values0 = buffer![10u32, 20, 30].into_array(); + let values1 = buffer![10u32, 20, 30].into_array(); + let chunk0 = + DictArray::try_new(buffer![0u8, 1].into_array(), values0.clone())?.into_array(); + let chunk1 = DictArray::try_new(buffer![2u8, 0, 1].into_array(), values1)?.into_array(); + let array = + ChunkedArray::try_new(vec![chunk0, chunk1], values0.dtype().clone())?.into_array(); + + let optimized = array.optimize()?; + + assert!(optimized.is::()); + let mut ctx = crate::LEGACY_SESSION.create_execution_ctx(); + assert_arrays_eq!( + optimized, + PrimitiveArray::from_iter([10u32, 20, 30, 10, 20]), + &mut ctx + ); + + Ok(()) + } + #[test] fn scalar_fn_values_pushdown_preserves_all_values_referenced() -> VortexResult<()> { let dict = unsafe { diff --git a/vortex-array/src/arrays/dict/vtable/kernel.rs b/vortex-array/src/arrays/dict/vtable/kernel.rs index ab750f7d663..79659af18dd 100644 --- a/vortex-array/src/arrays/dict/vtable/kernel.rs +++ b/vortex-array/src/arrays/dict/vtable/kernel.rs @@ -4,6 +4,7 @@ use vortex_session::VortexSession; use crate::ArrayVTable; +use crate::arrays::Chunked; use crate::arrays::Dict; use crate::arrays::dict::TakeExecuteAdaptor; use crate::optimizer::kernels::ArrayKernelsExt; @@ -16,6 +17,7 @@ use crate::scalar_fn::fns::fill_null::FillNullExecuteAdaptor; pub(crate) fn initialize(session: &VortexSession) { let kernels = session.kernels(); kernels.register_execute_parent_kernel(Binary.id(), Dict, CompareExecuteAdaptor(Dict)); + kernels.register_execute_parent_kernel(Dict.id(), Chunked, TakeExecuteAdaptor(Chunked)); kernels.register_execute_parent_kernel(Dict.id(), Dict, TakeExecuteAdaptor(Dict)); kernels.register_execute_parent_kernel(FillNull.id(), Dict, FillNullExecuteAdaptor(Dict)); } diff --git a/vortex-array/src/arrays/filter/kernel.rs b/vortex-array/src/arrays/filter/kernel.rs index 21bd225bf55..4213d692509 100644 --- a/vortex-array/src/arrays/filter/kernel.rs +++ b/vortex-array/src/arrays/filter/kernel.rs @@ -26,11 +26,14 @@ use crate::kernel::ExecuteParentKernel; use crate::matcher::Matcher; use crate::optimizer::kernels::ArrayKernelsExt; use crate::optimizer::rules::ArrayParentReduceRule; +use crate::scalar_fn::ScalarFnVTable; +use crate::scalar_fn::fns::like::Like; +use crate::scalar_fn::fns::like::LikeFilterExecuteAdaptor; pub(crate) fn initialize(session: &VortexSession) { - session - .kernels() - .register_execute_parent_kernel(Dict.id(), Filter, TakeExecuteAdaptor(Filter)); + let kernels = session.kernels(); + kernels.register_execute_parent_kernel(Dict.id(), Filter, TakeExecuteAdaptor(Filter)); + kernels.register_execute_parent_kernel(Like.id(), Filter, LikeFilterExecuteAdaptor); } pub trait FilterReduce: VTable { diff --git a/vortex-array/src/arrays/shared/vtable.rs b/vortex-array/src/arrays/shared/vtable.rs index 3c3a09216d2..fc03255d785 100644 --- a/vortex-array/src/arrays/shared/vtable.rs +++ b/vortex-array/src/arrays/shared/vtable.rs @@ -113,6 +113,14 @@ impl VTable for Shared { .get_or_compute(|source| source.clone().execute::(ctx)) .map(ExecutionResult::done) } + + fn reduce_parent( + array: ArrayView<'_, Self>, + parent: &ArrayRef, + child_idx: usize, + ) -> VortexResult> { + array.current_array_ref().reduce_parent(parent, child_idx) + } } impl OperationsVTable for Shared { fn scalar_at( diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 0c083f18ee5..b09c2d048b9 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -34,6 +34,8 @@ use crate::ArrayRef; use crate::Canonical; use crate::IntoArray; use crate::array::ArrayId; +use crate::arrays::Shared; +use crate::arrays::shared::SharedArrayExt; use crate::builders::ArrayBuilder; use crate::builders::builder_with_capacity_in; use crate::dtype::DType; @@ -568,6 +570,35 @@ fn execute_parent_for_child( slot_idx: usize, kernels: &ParentExecutionKernels, ctx: &mut ExecutionCtx, +) -> VortexResult> { + if let Some(result) = execute_parent_for_exact_child(parent, child, slot_idx, kernels, ctx)? { + return Ok(Some(result)); + } + + // Shared is a transparent cache wrapper. Try kernels against the wrapped source/current array + // before forcing Shared to canonicalize and populate its cache. + let mut current = child.clone(); + while let Some(source) = current + .as_opt::() + .map(|shared| shared.current_array_ref().clone()) + { + if let Some(result) = + execute_parent_for_exact_child(parent, &source, slot_idx, kernels, ctx)? + { + return Ok(Some(result)); + } + current = source; + } + + Ok(None) +} + +fn execute_parent_for_exact_child( + parent: &ArrayRef, + child: &ArrayRef, + slot_idx: usize, + kernels: &ParentExecutionKernels, + ctx: &mut ExecutionCtx, ) -> VortexResult> { let key = execute_parent_key(parent.encoding_id(), child.encoding_id()); if let Some(plugins) = kernels.get(&key) { diff --git a/vortex-array/src/scalar_fn/fns/like/kernel.rs b/vortex-array/src/scalar_fn/fns/like/kernel.rs index b3b683212ff..e62f41d9f92 100644 --- a/vortex-array/src/scalar_fn/fns/like/kernel.rs +++ b/vortex-array/src/scalar_fn/fns/like/kernel.rs @@ -6,9 +6,12 @@ use vortex_error::VortexResult; use crate::ArrayRef; use crate::ExecutionCtx; +use crate::IntoArray; use crate::array::ArrayView; use crate::array::VTable; +use crate::arrays::Filter; use crate::arrays::ScalarFn; +use crate::arrays::ScalarFnArray; use crate::arrays::scalar_fn::ExactScalarFn; use crate::arrays::scalar_fn::ScalarFnArrayExt; use crate::arrays::scalar_fn::ScalarFnArrayView; @@ -105,3 +108,45 @@ where ::like(array, pattern, options, ctx) } } + +/// Adaptor that executes a filtered input before evaluating LIKE. +/// +/// This preserves sparse row demand for `LIKE(Filter(child), constant)`: the filter executes into a +/// filtered child first, then the regular child-specific LIKE execute-parent kernel can run over +/// only the selected rows. +#[derive(Default, Debug)] +pub struct LikeFilterExecuteAdaptor; + +impl ExecuteParentKernel for LikeFilterExecuteAdaptor { + type Parent = ExactScalarFn; + + fn execute_parent( + &self, + array: ArrayView<'_, Filter>, + parent: ScalarFnArrayView<'_, LikeExpr>, + child_idx: usize, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if child_idx != 0 { + return Ok(None); + } + let scalar_fn_array = parent + .as_opt::() + .vortex_expect("ExactScalarFn matcher confirmed ScalarFnArray"); + let filtered = array.array().clone().execute::(ctx)?; + let children = scalar_fn_array + .iter_children() + .enumerate() + .map(|(idx, child)| { + if idx == child_idx { + filtered.clone() + } else { + child.clone() + } + }) + .collect(); + Ok(Some( + ScalarFnArray::try_new(scalar_fn_array.scalar_fn().clone(), children)?.into_array(), + )) + } +} diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 804218779c8..3cb9fb1bab9 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -32,6 +32,8 @@ use vortex_array::dtype::FieldPath; use vortex_btrblocks::BtrBlocksCompressorBuilder; use vortex_btrblocks::SchemeExt; use vortex_btrblocks::schemes::integer::IntDictScheme; +#[cfg(feature = "unstable_encodings")] +use vortex_btrblocks::schemes::string::OnPairScheme; use vortex_bytebool::ByteBool; use vortex_datetime_parts::DateTimeParts; use vortex_decimal_byte_parts::DecimalByteParts; @@ -160,8 +162,17 @@ impl Default for WriteStrategyBuilder { /// Create a new empty builder. It can be further configured, /// and then finally built yielding the [`LayoutStrategy`]. fn default() -> Self { + #[cfg_attr(not(feature = "unstable_encodings"), allow(unused_mut))] + let mut compressor = BtrBlocksCompressorBuilder::default(); + #[cfg(feature = "unstable_encodings")] + { + // OnPair currently optimizes for compressed size, but its string predicate kernels are + // not yet competitive with FSST for the scan-heavy default file format. + compressor = compressor.exclude_schemes([OnPairScheme.id()]); + } + Self { - compressor: CompressorConfig::BtrBlocks(BtrBlocksCompressorBuilder::default()), + compressor: CompressorConfig::BtrBlocks(compressor), row_block_size: 8192, field_writers: HashMap::new(), allow_encodings: Some(ALLOWED_ENCODINGS.clone()),