From e31fafe11c49f0eee6766793700acf50e33c4cd1 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Thu, 30 Apr 2026 15:23:08 +0200 Subject: [PATCH 01/11] Refactor generic InList static filter helpers --- .../in_list/array_static_filter.rs | 163 ++++++++++-------- 1 file changed, 90 insertions(+), 73 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list/array_static_filter.rs b/datafusion/physical-expr/src/expressions/in_list/array_static_filter.rs index 93bfcd49600d0..d1befdc9cba12 100644 --- a/datafusion/physical-expr/src/expressions/in_list/array_static_filter.rs +++ b/datafusion/physical-expr/src/expressions/in_list/array_static_filter.rs @@ -42,71 +42,6 @@ pub(super) struct ArrayStaticFilter { map: HashMap, } -impl StaticFilter for ArrayStaticFilter { - fn null_count(&self) -> usize { - self.in_array.null_count() - } - - /// Checks if values in `v` are contained in the `in_array` using this hash set for lookup. - fn contains(&self, v: &dyn Array, negated: bool) -> Result { - // Null type comparisons always return null (SQL three-valued logic) - if v.data_type() == &DataType::Null - || self.in_array.data_type() == &DataType::Null - { - let nulls = NullBuffer::new_null(v.len()); - return Ok(BooleanArray::new( - BooleanBuffer::new_unset(v.len()), - Some(nulls), - )); - } - - // Unwrap dictionary-encoded needles when the value type matches - // in_array, evaluating against the dictionary values and mapping - // back via keys. - downcast_dictionary_array! { - v => { - // Only unwrap when the haystack (in_array) type matches - // the dictionary value type - if v.values().data_type() == self.in_array.data_type() { - let values_contains = self.contains(v.values().as_ref(), negated)?; - let result = take(&values_contains, v.keys(), None)?; - return Ok(downcast_array(result.as_ref())); - } - } - _ => {} - } - - let needle_nulls = v.logical_nulls(); - let needle_nulls = needle_nulls.as_ref(); - let haystack_has_nulls = self.in_array.null_count() != 0; - - with_hashes([v], &self.state, |hashes| { - let cmp = make_comparator(v, &self.in_array, SortOptions::default())?; - Ok((0..v.len()) - .map(|i| { - // SQL three-valued logic: null IN (...) is always null - if needle_nulls.is_some_and(|nulls| nulls.is_null(i)) { - return None; - } - - let hash = hashes[i]; - let contains = self - .map - .raw_entry() - .from_hash(hash, |idx| cmp(i, *idx).is_eq()) - .is_some(); - - match contains { - true => Some(!negated), - false if haystack_has_nulls => None, - false => Some(negated), - } - }) - .collect()) - }) - } -} - impl ArrayStaticFilter { /// Computes a [`StaticFilter`] for the provided [`Array`] if there /// are nulls present or there are more than the configured number of @@ -125,10 +60,23 @@ impl ArrayStaticFilter { } let state = RandomState::default(); + let map = Self::build_haystack_map(&in_array, &state)?; + + Ok(Self { + in_array, + state, + map, + }) + } + + fn build_haystack_map( + haystack: &ArrayRef, + state: &RandomState, + ) -> Result> { let mut map: HashMap = HashMap::with_hasher(()); - with_hashes([&in_array], &state, |hashes| -> Result<()> { - let cmp = make_comparator(&in_array, &in_array, SortOptions::default())?; + with_hashes([haystack.as_ref()], state, |hashes| -> Result<()> { + let cmp = make_comparator(haystack, haystack, SortOptions::default())?; let insert_value = |idx| { let hash = hashes[idx]; @@ -140,21 +88,90 @@ impl ArrayStaticFilter { } }; - match in_array.nulls() { + match haystack.nulls() { Some(nulls) => { BitIndexIterator::new(nulls.validity(), nulls.offset(), nulls.len()) .for_each(insert_value) } - None => (0..in_array.len()).for_each(insert_value), + None => (0..haystack.len()).for_each(insert_value), } Ok(()) })?; - Ok(Self { - in_array, - state, - map, + Ok(map) + } + + fn find_needles_in_haystack( + &self, + needles: &dyn Array, + negated: bool, + ) -> Result { + let needle_nulls = needles.logical_nulls(); + let needle_nulls = needle_nulls.as_ref(); + let haystack_has_nulls = self.in_array.null_count() != 0; + + with_hashes([needles], &self.state, |hashes| { + let cmp = make_comparator(needles, &self.in_array, SortOptions::default())?; + Ok((0..needles.len()) + .map(|i| { + // SQL three-valued logic: null IN (...) is always null + if needle_nulls.is_some_and(|nulls| nulls.is_null(i)) { + return None; + } + + let hash = hashes[i]; + let contains = self + .map + .raw_entry() + .from_hash(hash, |idx| cmp(i, *idx).is_eq()) + .is_some(); + + match contains { + true => Some(!negated), + false if haystack_has_nulls => None, + false => Some(negated), + } + }) + .collect()) }) } } + +impl StaticFilter for ArrayStaticFilter { + fn null_count(&self) -> usize { + self.in_array.null_count() + } + + /// Checks if values in `v` are contained in the `in_array` using this hash set for lookup. + fn contains(&self, v: &dyn Array, negated: bool) -> Result { + // Null type comparisons always return null (SQL three-valued logic) + if v.data_type() == &DataType::Null + || self.in_array.data_type() == &DataType::Null + { + let nulls = NullBuffer::new_null(v.len()); + return Ok(BooleanArray::new( + BooleanBuffer::new_unset(v.len()), + Some(nulls), + )); + } + + // Unwrap dictionary-encoded needles when the value type matches + // in_array, evaluating against the dictionary values and mapping + // back via keys. + downcast_dictionary_array! { + v => { + // Only unwrap when the haystack (in_array) type matches + // the dictionary value type + if v.values().data_type() == self.in_array.data_type() { + let values_contains = self.contains(v.values().as_ref(), negated)?; + let result = take(&values_contains, v.keys(), None)?; + return Ok(downcast_array(result.as_ref())); + } + } + _ => {} + } + + self.find_needles_in_haystack(v, negated) + } +} From afc196be08299376ae6414d16e7293039e158185 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Thu, 30 Apr 2026 15:24:36 +0200 Subject: [PATCH 02/11] Build InList results from bitmaps --- .../physical-expr/src/expressions/in_list.rs | 1 + .../in_list/array_static_filter.rs | 36 +++--- .../src/expressions/in_list/result.rs | 105 ++++++++++++++++++ 3 files changed, 121 insertions(+), 21 deletions(-) create mode 100644 datafusion/physical-expr/src/expressions/in_list/result.rs diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 1d3e244d73971..50ff3936937bf 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -38,6 +38,7 @@ use datafusion_expr::{ColumnarValue, expr_vec_fmt}; mod array_static_filter; mod primitive_filter; +mod result; mod static_filter; mod strategy; diff --git a/datafusion/physical-expr/src/expressions/in_list/array_static_filter.rs b/datafusion/physical-expr/src/expressions/in_list/array_static_filter.rs index d1befdc9cba12..f6c24b15b0da7 100644 --- a/datafusion/physical-expr/src/expressions/in_list/array_static_filter.rs +++ b/datafusion/physical-expr/src/expressions/in_list/array_static_filter.rs @@ -28,6 +28,7 @@ use datafusion_common::Result; use datafusion_common::hash_utils::{RandomState, with_hashes}; use hashbrown::hash_map::RawEntryMut; +use super::result::build_in_list_result; use super::static_filter::StaticFilter; /// Static filter for InList that stores the array and hash set for O(1) lookups @@ -108,32 +109,25 @@ impl ArrayStaticFilter { negated: bool, ) -> Result { let needle_nulls = needles.logical_nulls(); - let needle_nulls = needle_nulls.as_ref(); let haystack_has_nulls = self.in_array.null_count() != 0; - with_hashes([needles], &self.state, |hashes| { + with_hashes([needles], &self.state, |needle_hashes| { let cmp = make_comparator(needles, &self.in_array, SortOptions::default())?; - Ok((0..needles.len()) - .map(|i| { - // SQL three-valued logic: null IN (...) is always null - if needle_nulls.is_some_and(|nulls| nulls.is_null(i)) { - return None; - } - - let hash = hashes[i]; - let contains = self - .map + + Ok(build_in_list_result( + needles.len(), + needle_nulls.as_ref(), + haystack_has_nulls, + negated, + #[inline(always)] + |i| { + let hash = needle_hashes[i]; + self.map .raw_entry() .from_hash(hash, |idx| cmp(i, *idx).is_eq()) - .is_some(); - - match contains { - true => Some(!negated), - false if haystack_has_nulls => None, - false => Some(negated), - } - }) - .collect()) + .is_some() + }, + )) }) } } diff --git a/datafusion/physical-expr/src/expressions/in_list/result.rs b/datafusion/physical-expr/src/expressions/in_list/result.rs new file mode 100644 index 0000000000000..3ebdbfe19f743 --- /dev/null +++ b/datafusion/physical-expr/src/expressions/in_list/result.rs @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Result building helpers for InList operations. +//! +//! This module provides unified logic for building BooleanArray results +//! from IN list membership tests, handling null propagation correctly +//! according to SQL three-valued logic. + +use arrow::array::BooleanArray; +use arrow::buffer::{BooleanBuffer, NullBuffer}; + +// Truth table for (needle_nulls, haystack_has_nulls, negated): +// (Some, true, false) => values: valid & contains, nulls: valid & contains +// (None, true, false) => values: contains, nulls: contains +// (Some, true, true) => values: valid & !contains, nulls: valid & contains +// (None, true, true) => values: !contains, nulls: contains +// (Some, false, false) => values: valid & contains, nulls: valid +// (Some, false, true) => values: valid & !contains, nulls: valid +// (None, false, false) => values: contains, nulls: none +// (None, false, true) => values: !contains, nulls: none + +/// Builds a BooleanArray result for IN list operations. +/// +/// This function handles the null propagation logic for SQL IN lists: +/// - If the needle value is null, the result is null +/// - If the needle is not in the set and the haystack has nulls, the result is null +/// - Otherwise, the result is true/false based on membership and negation +/// +/// This version computes contains for all positions, including nulls, then applies +/// null masking via bitmap operations. +#[inline] +pub(crate) fn build_in_list_result( + len: usize, + needle_nulls: Option<&NullBuffer>, + haystack_has_nulls: bool, + negated: bool, + contains: C, +) -> BooleanArray +where + C: FnMut(usize) -> bool, +{ + let contains_buf = BooleanBuffer::collect_bool(len, contains); + build_result_from_contains(needle_nulls, haystack_has_nulls, negated, contains_buf) +} + +/// Builds a BooleanArray result from a pre-computed contains buffer. +/// +/// This version does not assume contains_buf is pre-masked at null positions. +/// It handles nulls using bitmap operations. +#[inline] +pub(crate) fn build_result_from_contains( + needle_nulls: Option<&NullBuffer>, + haystack_has_nulls: bool, + negated: bool, + contains_buf: BooleanBuffer, +) -> BooleanArray { + match (needle_nulls, haystack_has_nulls, negated) { + // Haystack has nulls: result is null unless value is found. + (Some(v), true, false) => { + // values: valid & contains, nulls: valid & contains + let values = v.inner() & &contains_buf; + BooleanArray::new(values.clone(), Some(NullBuffer::new(values))) + } + (None, true, false) => { + BooleanArray::new(contains_buf.clone(), Some(NullBuffer::new(contains_buf))) + } + (Some(v), true, true) => { + // NOT IN with nulls: false if found, null if not found or needle null. + // values: valid & !contains, nulls: valid & contains + let valid = v.inner(); + let values = valid & &(!&contains_buf); + let nulls = valid & &contains_buf; + BooleanArray::new(values, Some(NullBuffer::new(nulls))) + } + (None, true, true) => { + BooleanArray::new(!&contains_buf, Some(NullBuffer::new(contains_buf))) + } + // Haystack has no nulls: result validity follows needle validity. + (Some(v), false, false) => { + // values: valid & contains, nulls: valid + BooleanArray::new(v.inner() & &contains_buf, Some(v.clone())) + } + (Some(v), false, true) => { + // values: valid & !contains, nulls: valid + BooleanArray::new(v.inner() & &(!&contains_buf), Some(v.clone())) + } + (None, false, false) => BooleanArray::new(contains_buf, None), + (None, false, true) => BooleanArray::new(!&contains_buf, None), + } +} From a84579d35247bf6318ed58da6df73a0c622351ab Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Thu, 30 Apr 2026 15:25:25 +0200 Subject: [PATCH 03/11] Optimize generic InList static filtering --- .../in_list/array_static_filter.rs | 37 +++++++------------ 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list/array_static_filter.rs b/datafusion/physical-expr/src/expressions/in_list/array_static_filter.rs index f6c24b15b0da7..75e92dbcc59b4 100644 --- a/datafusion/physical-expr/src/expressions/in_list/array_static_filter.rs +++ b/datafusion/physical-expr/src/expressions/in_list/array_static_filter.rs @@ -23,10 +23,9 @@ use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::compute::{SortOptions, take}; use arrow::datatypes::DataType; use arrow::util::bit_iterator::BitIndexIterator; -use datafusion_common::HashMap; use datafusion_common::Result; use datafusion_common::hash_utils::{RandomState, with_hashes}; -use hashbrown::hash_map::RawEntryMut; +use hashbrown::HashTable; use super::result::build_in_list_result; use super::static_filter::StaticFilter; @@ -36,11 +35,8 @@ use super::static_filter::StaticFilter; pub(super) struct ArrayStaticFilter { in_array: ArrayRef, state: RandomState, - /// Used to provide a lookup from value to in list index - /// - /// Note: usize::hash is not used, instead the raw entry - /// API is used to store entries w.r.t their value - map: HashMap, + /// Stores indices into `in_array` for O(1) lookups. + table: HashTable, } impl ArrayStaticFilter { @@ -56,36 +52,34 @@ impl ArrayStaticFilter { return Ok(ArrayStaticFilter { in_array, state: RandomState::default(), - map: HashMap::with_hasher(()), + table: HashTable::new(), }); } let state = RandomState::default(); - let map = Self::build_haystack_map(&in_array, &state)?; + let table = Self::build_haystack_table(&in_array, &state)?; Ok(Self { in_array, state, - map, + table, }) } - fn build_haystack_map( + fn build_haystack_table( haystack: &ArrayRef, state: &RandomState, - ) -> Result> { - let mut map: HashMap = HashMap::with_hasher(()); + ) -> Result> { + let mut table = HashTable::new(); with_hashes([haystack.as_ref()], state, |hashes| -> Result<()> { let cmp = make_comparator(haystack, haystack, SortOptions::default())?; let insert_value = |idx| { let hash = hashes[idx]; - if let RawEntryMut::Vacant(v) = map - .raw_entry_mut() - .from_hash(hash, |x| cmp(*x, idx).is_eq()) - { - v.insert_with_hasher(hash, idx, (), |x| hashes[*x]); + // Only insert if not already present (deduplication) + if table.find(hash, |&x| cmp(x, idx).is_eq()).is_none() { + table.insert_unique(hash, idx, |&x| hashes[x]); } }; @@ -100,7 +94,7 @@ impl ArrayStaticFilter { Ok(()) })?; - Ok(map) + Ok(table) } fn find_needles_in_haystack( @@ -122,10 +116,7 @@ impl ArrayStaticFilter { #[inline(always)] |i| { let hash = needle_hashes[i]; - self.map - .raw_entry() - .from_hash(hash, |idx| cmp(i, *idx).is_eq()) - .is_some() + self.table.find(hash, |&idx| cmp(i, idx).is_eq()).is_some() }, )) }) From 5351b958fe047fdf4a5fe099b40fbf7007743c2c Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Fri, 9 Jan 2026 11:59:30 +0100 Subject: [PATCH 04/11] Implement Bitmap Filter for UInt8 (Stack-based) Replaces HashSet with a 32-byte stack-allocated bitmap. Provides O(1) membership testing via bit-shifting, significantly reducing memory overhead and improving cache locality. Triggers for UInt8 arrays. --- .../expressions/in_list/primitive_filter.rs | 154 +++++++++++++++--- .../src/expressions/in_list/static_filter.rs | 17 ++ .../src/expressions/in_list/strategy.rs | 2 +- 3 files changed, 149 insertions(+), 24 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs index 2c084a1cb247b..f665025581f2e 100644 --- a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs +++ b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs @@ -15,16 +15,77 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{ - Array, ArrayRef, AsArray, BooleanArray, downcast_array, downcast_dictionary_array, -}; +//! Optimized primitive type filters for InList expressions. +//! +//! This module provides membership tests for Arrow primitive types. + +use arrow::array::{Array, ArrayRef, AsArray, BooleanArray}; use arrow::buffer::{BooleanBuffer, NullBuffer}; -use arrow::compute::take; use arrow::datatypes::*; use datafusion_common::{HashSet, Result, exec_datafusion_err}; use std::hash::{Hash, Hasher}; -use super::static_filter::StaticFilter; +use super::result::build_in_list_result; +use super::static_filter::{StaticFilter, handle_dictionary}; + +/// Bitmap filter for O(1) set membership via single bit test. +/// +/// `UInt8` has only 256 possible values, so the filter stores membership in a +/// 256-bit bitmap instead of using a hash table. +pub(super) struct UInt8BitmapFilter { + null_count: usize, + bits: [u64; 4], +} + +impl UInt8BitmapFilter { + pub(super) fn try_new(in_array: &ArrayRef) -> Result { + let prim_array = in_array.as_primitive_opt::().ok_or_else(|| { + exec_datafusion_err!("UInt8BitmapFilter: expected UInt8 array") + })?; + let mut bits = [0u64; 4]; + for v in prim_array.iter().flatten() { + let index = v as usize; + bits[index / 64] |= 1u64 << (index % 64); + } + Ok(Self { + null_count: prim_array.null_count(), + bits, + }) + } + + #[inline(always)] + fn check(&self, needle: u8) -> bool { + let index = needle as usize; + (self.bits[index / 64] >> (index % 64)) & 1 != 0 + } +} + +impl StaticFilter for UInt8BitmapFilter { + fn null_count(&self) -> usize { + self.null_count + } + + fn contains(&self, v: &dyn Array, negated: bool) -> Result { + handle_dictionary!(self, v, negated); + let v = v.as_primitive_opt::().ok_or_else(|| { + exec_datafusion_err!("UInt8BitmapFilter: expected UInt8 array") + })?; + let input_values = v.values(); + Ok(build_in_list_result( + v.len(), + v.nulls(), + self.null_count > 0, + negated, + #[inline(always)] + |i| { + // SAFETY: `build_in_list_result` invokes this closure for + // indices in `0..v.len()`, which matches `input_values.len()`. + let needle = unsafe { *input_values.get_unchecked(i) }; + self.check(needle) + }, + )) + } +} /// Wrapper for f32 that implements Hash and Eq using bit comparison. /// This treats NaN values as equal to each other when they have the same bit pattern. @@ -94,9 +155,13 @@ macro_rules! primitive_static_filter { impl $Name { pub(super) fn try_new(in_array: &ArrayRef) -> Result { - let in_array = in_array - .as_primitive_opt::<$ArrowType>() - .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?; + let in_array = + in_array.as_primitive_opt::<$ArrowType>().ok_or_else(|| { + exec_datafusion_err!( + "Failed to downcast an array to a '{}' array", + stringify!($ArrowType) + ) + })?; let mut values = HashSet::with_capacity(in_array.len()); let null_count = in_array.null_count(); @@ -115,19 +180,14 @@ macro_rules! primitive_static_filter { } fn contains(&self, v: &dyn Array, negated: bool) -> Result { - // Handle dictionary arrays by recursing on the values - downcast_dictionary_array! { - v => { - let values_contains = self.contains(v.values().as_ref(), negated)?; - let result = take(&values_contains, v.keys(), None)?; - return Ok(downcast_array(result.as_ref())) - } - _ => {} - } + handle_dictionary!(self, v, negated); - let v = v - .as_primitive_opt::<$ArrowType>() - .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?; + let v = v.as_primitive_opt::<$ArrowType>().ok_or_else(|| { + exec_datafusion_err!( + "Failed to downcast an array to a '{}' array", + stringify!($ArrowType) + ) + })?; let haystack_has_nulls = self.null_count > 0; let needle_values = v.values(); @@ -188,8 +248,10 @@ macro_rules! primitive_static_filter { } (true, true) => { // Both have nulls - combine needle nulls with haystack-induced nulls - let needle_validity = needle_nulls.map(|n| n.inner().clone()) - .unwrap_or_else(|| BooleanBuffer::new_set(needle_values.len())); + let needle_validity = + needle_nulls.map(|n| n.inner().clone()).unwrap_or_else( + || BooleanBuffer::new_set(needle_values.len()), + ); // Valid when original "in set" is true (see above) let haystack_validity = if negated { @@ -215,7 +277,6 @@ primitive_static_filter!(Int8StaticFilter, Int8Type); primitive_static_filter!(Int16StaticFilter, Int16Type); primitive_static_filter!(Int32StaticFilter, Int32Type); primitive_static_filter!(Int64StaticFilter, Int64Type); -primitive_static_filter!(UInt8StaticFilter, UInt8Type); primitive_static_filter!(UInt16StaticFilter, UInt16Type); primitive_static_filter!(UInt32StaticFilter, UInt32Type); primitive_static_filter!(UInt64StaticFilter, UInt64Type); @@ -231,3 +292,50 @@ macro_rules! float_static_filter { // Generate specialized filters for float types using ordered wrappers float_static_filter!(Float32StaticFilter, Float32Type, OrderedFloat32); float_static_filter!(Float64StaticFilter, Float64Type, OrderedFloat64); + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + use arrow::array::{DictionaryArray, Int8Array, UInt8Array}; + + fn assert_contains( + filter: &UInt8BitmapFilter, + needles: &dyn Array, + expected: Vec>, + ) -> Result<()> { + assert_eq!( + filter.contains(needles, false)?, + BooleanArray::from(expected) + ); + Ok(()) + } + + #[test] + fn bitmap_filter_u8_handles_nulls() -> Result<()> { + let haystack: ArrayRef = Arc::new(UInt8Array::from(vec![Some(1), None, Some(3)])); + let filter = UInt8BitmapFilter::try_new(&haystack)?; + let needles = UInt8Array::from(vec![Some(1), Some(2), None, Some(3)]); + + assert_contains(&filter, &needles, vec![Some(true), None, None, Some(true)])?; + assert_eq!( + filter.contains(&needles, true)?, + BooleanArray::from(vec![Some(false), None, None, Some(false)]) + ); + + Ok(()) + } + + #[test] + fn bitmap_filter_u8_handles_dictionary_needles() -> Result<()> { + let haystack: ArrayRef = Arc::new(UInt8Array::from(vec![Some(1), None, Some(3)])); + let filter = UInt8BitmapFilter::try_new(&haystack)?; + + let keys = Int8Array::from(vec![Some(0), Some(1), None, Some(2)]); + let values = Arc::new(UInt8Array::from(vec![Some(1), Some(2), Some(3)])); + let needles = DictionaryArray::try_new(keys, values)?; + + assert_contains(&filter, &needles, vec![Some(true), None, None, Some(true)]) + } +} diff --git a/datafusion/physical-expr/src/expressions/in_list/static_filter.rs b/datafusion/physical-expr/src/expressions/in_list/static_filter.rs index 218bd27950266..3c964d4183474 100644 --- a/datafusion/physical-expr/src/expressions/in_list/static_filter.rs +++ b/datafusion/physical-expr/src/expressions/in_list/static_filter.rs @@ -35,3 +35,20 @@ pub(super) trait StaticFilter { /// implementation unwraps the dictionary and operates on its values. fn contains(&self, v: &dyn Array, negated: bool) -> Result; } + +/// Evaluate dictionary-encoded needles by applying a filter to dictionary +/// values and remapping the result through the keys. +macro_rules! handle_dictionary { + ($self:ident, $v:ident, $negated:ident) => { + arrow::array::downcast_dictionary_array! { + $v => { + let values_contains = $self.contains($v.values().as_ref(), $negated)?; + let result = arrow::compute::take(&values_contains, $v.keys(), None)?; + return Ok(arrow::array::downcast_array(result.as_ref())) + } + _ => {} + } + }; +} + +pub(super) use handle_dictionary; diff --git a/datafusion/physical-expr/src/expressions/in_list/strategy.rs b/datafusion/physical-expr/src/expressions/in_list/strategy.rs index b7ee3dd1a3b9d..1fb8e03fe2040 100644 --- a/datafusion/physical-expr/src/expressions/in_list/strategy.rs +++ b/datafusion/physical-expr/src/expressions/in_list/strategy.rs @@ -42,7 +42,7 @@ pub(super) fn instantiate_static_filter( DataType::Int16 => Ok(Arc::new(Int16StaticFilter::try_new(&in_array)?)), DataType::Int32 => Ok(Arc::new(Int32StaticFilter::try_new(&in_array)?)), DataType::Int64 => Ok(Arc::new(Int64StaticFilter::try_new(&in_array)?)), - DataType::UInt8 => Ok(Arc::new(UInt8StaticFilter::try_new(&in_array)?)), + DataType::UInt8 => Ok(Arc::new(UInt8BitmapFilter::try_new(&in_array)?)), DataType::UInt16 => Ok(Arc::new(UInt16StaticFilter::try_new(&in_array)?)), DataType::UInt32 => Ok(Arc::new(UInt32StaticFilter::try_new(&in_array)?)), DataType::UInt64 => Ok(Arc::new(UInt64StaticFilter::try_new(&in_array)?)), From 5514d78b5037867c273dce36f39536499f87cbba Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Fri, 9 Jan 2026 11:59:30 +0100 Subject: [PATCH 05/11] Extend Bitmap Filter to UInt16 (Heap-based) Implements an 8 KB heap-allocated bitmap for UInt16. Maintains O(1) performance while handling the larger value space. Triggers for UInt16 arrays. --- .../expressions/in_list/primitive_filter.rs | 89 ++++++++++++++++++- .../src/expressions/in_list/strategy.rs | 2 +- 2 files changed, 87 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs index f665025581f2e..90efe611934a4 100644 --- a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs +++ b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs @@ -87,6 +87,65 @@ impl StaticFilter for UInt8BitmapFilter { } } +/// Bitmap filter for O(1) `UInt16` set membership via single bit test. +/// +/// `UInt16` has 65,536 possible values, so the filter stores membership in an +/// 8 KiB heap-allocated bitmap instead of using a hash table. +pub(super) struct UInt16BitmapFilter { + null_count: usize, + bits: Box<[u64; 1024]>, +} + +impl UInt16BitmapFilter { + pub(super) fn try_new(in_array: &ArrayRef) -> Result { + let prim_array = in_array.as_primitive_opt::().ok_or_else(|| { + exec_datafusion_err!("UInt16BitmapFilter: expected UInt16 array") + })?; + let mut bits = Box::new([0u64; 1024]); + for v in prim_array.iter().flatten() { + let index = v as usize; + bits[index / 64] |= 1u64 << (index % 64); + } + Ok(Self { + null_count: prim_array.null_count(), + bits, + }) + } + + #[inline(always)] + fn check(&self, needle: u16) -> bool { + let index = needle as usize; + (self.bits[index / 64] >> (index % 64)) & 1 != 0 + } +} + +impl StaticFilter for UInt16BitmapFilter { + fn null_count(&self) -> usize { + self.null_count + } + + fn contains(&self, v: &dyn Array, negated: bool) -> Result { + handle_dictionary!(self, v, negated); + let v = v.as_primitive_opt::().ok_or_else(|| { + exec_datafusion_err!("UInt16BitmapFilter: expected UInt16 array") + })?; + let input_values = v.values(); + Ok(build_in_list_result( + v.len(), + v.nulls(), + self.null_count > 0, + negated, + #[inline(always)] + |i| { + // SAFETY: `build_in_list_result` invokes this closure for + // indices in `0..v.len()`, which matches `input_values.len()`. + let needle = unsafe { *input_values.get_unchecked(i) }; + self.check(needle) + }, + )) + } +} + /// Wrapper for f32 that implements Hash and Eq using bit comparison. /// This treats NaN values as equal to each other when they have the same bit pattern. #[derive(Clone, Copy)] @@ -277,7 +336,6 @@ primitive_static_filter!(Int8StaticFilter, Int8Type); primitive_static_filter!(Int16StaticFilter, Int16Type); primitive_static_filter!(Int32StaticFilter, Int32Type); primitive_static_filter!(Int64StaticFilter, Int64Type); -primitive_static_filter!(UInt16StaticFilter, UInt16Type); primitive_static_filter!(UInt32StaticFilter, UInt32Type); primitive_static_filter!(UInt64StaticFilter, UInt64Type); @@ -298,10 +356,10 @@ mod tests { use super::*; use std::sync::Arc; - use arrow::array::{DictionaryArray, Int8Array, UInt8Array}; + use arrow::array::{DictionaryArray, Int8Array, UInt8Array, UInt16Array}; fn assert_contains( - filter: &UInt8BitmapFilter, + filter: &dyn StaticFilter, needles: &dyn Array, expected: Vec>, ) -> Result<()> { @@ -338,4 +396,29 @@ mod tests { assert_contains(&filter, &needles, vec![Some(true), None, None, Some(true)]) } + + #[test] + fn bitmap_filter_u16_handles_boundaries_and_nulls() -> Result<()> { + let haystack: ArrayRef = Arc::new(UInt16Array::from(vec![ + Some(0), + None, + Some(1024), + Some(u16::MAX), + ])); + let filter = UInt16BitmapFilter::try_new(&haystack)?; + let needles = + UInt16Array::from(vec![Some(0), Some(1), Some(1024), Some(u16::MAX), None]); + + assert_contains( + &filter, + &needles, + vec![Some(true), None, Some(true), Some(true), None], + )?; + assert_eq!( + filter.contains(&needles, true)?, + BooleanArray::from(vec![Some(false), None, Some(false), Some(false), None]) + ); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/in_list/strategy.rs b/datafusion/physical-expr/src/expressions/in_list/strategy.rs index 1fb8e03fe2040..aec94bddb920b 100644 --- a/datafusion/physical-expr/src/expressions/in_list/strategy.rs +++ b/datafusion/physical-expr/src/expressions/in_list/strategy.rs @@ -43,7 +43,7 @@ pub(super) fn instantiate_static_filter( DataType::Int32 => Ok(Arc::new(Int32StaticFilter::try_new(&in_array)?)), DataType::Int64 => Ok(Arc::new(Int64StaticFilter::try_new(&in_array)?)), DataType::UInt8 => Ok(Arc::new(UInt8BitmapFilter::try_new(&in_array)?)), - DataType::UInt16 => Ok(Arc::new(UInt16StaticFilter::try_new(&in_array)?)), + DataType::UInt16 => Ok(Arc::new(UInt16BitmapFilter::try_new(&in_array)?)), DataType::UInt32 => Ok(Arc::new(UInt32StaticFilter::try_new(&in_array)?)), DataType::UInt64 => Ok(Arc::new(UInt64StaticFilter::try_new(&in_array)?)), // Float primitive types (use ordered wrappers for Hash/Eq) From 8d99ad0ce5e2ce61419e5121751d0a9b18a2a2f8 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Fri, 19 Jun 2026 07:25:50 +0200 Subject: [PATCH 06/11] Refactor bitmap filters behind shared config --- .../expressions/in_list/primitive_filter.rs | 148 ++++++++++-------- .../src/expressions/in_list/strategy.rs | 8 +- 2 files changed, 87 insertions(+), 69 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs index 90efe611934a4..de5df6ef06282 100644 --- a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs +++ b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs @@ -28,83 +28,98 @@ use std::hash::{Hash, Hasher}; use super::result::build_in_list_result; use super::static_filter::{StaticFilter, handle_dictionary}; -/// Bitmap filter for O(1) set membership via single bit test. -/// -/// `UInt8` has only 256 possible values, so the filter stores membership in a -/// 256-bit bitmap instead of using a hash table. -pub(super) struct UInt8BitmapFilter { - null_count: usize, - bits: [u64; 4], +pub(super) trait BitmapStorage: Send + Sync { + fn new_zeroed() -> Self; + fn set_bit(&mut self, index: usize); + fn get_bit(&self, index: usize) -> bool; } -impl UInt8BitmapFilter { - pub(super) fn try_new(in_array: &ArrayRef) -> Result { - let prim_array = in_array.as_primitive_opt::().ok_or_else(|| { - exec_datafusion_err!("UInt8BitmapFilter: expected UInt8 array") - })?; - let mut bits = [0u64; 4]; - for v in prim_array.iter().flatten() { - let index = v as usize; - bits[index / 64] |= 1u64 << (index % 64); - } - Ok(Self { - null_count: prim_array.null_count(), - bits, - }) +impl BitmapStorage for [u64; 4] { + #[inline] + fn new_zeroed() -> Self { + [0u64; 4] + } + #[inline] + fn set_bit(&mut self, index: usize) { + self[index / 64] |= 1u64 << (index % 64); } + #[inline(always)] + fn get_bit(&self, index: usize) -> bool { + (self[index / 64] >> (index % 64)) & 1 != 0 + } +} +impl BitmapStorage for Box<[u64; 1024]> { + #[inline] + fn new_zeroed() -> Self { + Box::new([0u64; 1024]) + } + #[inline] + fn set_bit(&mut self, index: usize) { + self[index / 64] |= 1u64 << (index % 64); + } #[inline(always)] - fn check(&self, needle: u8) -> bool { - let index = needle as usize; - (self.bits[index / 64] >> (index % 64)) & 1 != 0 + fn get_bit(&self, index: usize) -> bool { + (self[index / 64] >> (index % 64)) & 1 != 0 } } -impl StaticFilter for UInt8BitmapFilter { - fn null_count(&self) -> usize { - self.null_count +pub(super) trait BitmapFilterConfig: Send + Sync + 'static { + const DATA_TYPE_NAME: &'static str; + + type Native: ArrowNativeType + Copy + Send + Sync; + type ArrowType: ArrowPrimitiveType; + type Storage: BitmapStorage; + + fn to_index(v: Self::Native) -> usize; +} + +pub(super) enum UInt8BitmapConfig {} +impl BitmapFilterConfig for UInt8BitmapConfig { + const DATA_TYPE_NAME: &'static str = "UInt8"; + + type Native = u8; + type ArrowType = UInt8Type; + type Storage = [u64; 4]; + + #[inline(always)] + fn to_index(v: u8) -> usize { + v as usize } +} - fn contains(&self, v: &dyn Array, negated: bool) -> Result { - handle_dictionary!(self, v, negated); - let v = v.as_primitive_opt::().ok_or_else(|| { - exec_datafusion_err!("UInt8BitmapFilter: expected UInt8 array") - })?; - let input_values = v.values(); - Ok(build_in_list_result( - v.len(), - v.nulls(), - self.null_count > 0, - negated, - #[inline(always)] - |i| { - // SAFETY: `build_in_list_result` invokes this closure for - // indices in `0..v.len()`, which matches `input_values.len()`. - let needle = unsafe { *input_values.get_unchecked(i) }; - self.check(needle) - }, - )) +pub(super) enum UInt16BitmapConfig {} +impl BitmapFilterConfig for UInt16BitmapConfig { + const DATA_TYPE_NAME: &'static str = "UInt16"; + + type Native = u16; + type ArrowType = UInt16Type; + type Storage = Box<[u64; 1024]>; + + #[inline(always)] + fn to_index(v: u16) -> usize { + v as usize } } -/// Bitmap filter for O(1) `UInt16` set membership via single bit test. +/// Bitmap filter for O(1) set membership via single bit test. /// -/// `UInt16` has 65,536 possible values, so the filter stores membership in an -/// 8 KiB heap-allocated bitmap instead of using a hash table. -pub(super) struct UInt16BitmapFilter { +/// Small integer domains can store membership in a fixed-size bitmap instead +/// of using a hash table. +pub(super) struct BitmapFilter { null_count: usize, - bits: Box<[u64; 1024]>, + bits: C::Storage, } -impl UInt16BitmapFilter { +impl BitmapFilter { pub(super) fn try_new(in_array: &ArrayRef) -> Result { - let prim_array = in_array.as_primitive_opt::().ok_or_else(|| { - exec_datafusion_err!("UInt16BitmapFilter: expected UInt16 array") - })?; - let mut bits = Box::new([0u64; 1024]); + let prim_array = + in_array.as_primitive_opt::().ok_or_else(|| { + exec_datafusion_err!("BitmapFilter: expected {} array", C::DATA_TYPE_NAME) + })?; + let mut bits = C::Storage::new_zeroed(); for v in prim_array.iter().flatten() { - let index = v as usize; - bits[index / 64] |= 1u64 << (index % 64); + bits.set_bit(C::to_index(v)); } Ok(Self { null_count: prim_array.null_count(), @@ -113,21 +128,20 @@ impl UInt16BitmapFilter { } #[inline(always)] - fn check(&self, needle: u16) -> bool { - let index = needle as usize; - (self.bits[index / 64] >> (index % 64)) & 1 != 0 + fn check(&self, needle: C::Native) -> bool { + self.bits.get_bit(C::to_index(needle)) } } -impl StaticFilter for UInt16BitmapFilter { +impl StaticFilter for BitmapFilter { fn null_count(&self) -> usize { self.null_count } fn contains(&self, v: &dyn Array, negated: bool) -> Result { handle_dictionary!(self, v, negated); - let v = v.as_primitive_opt::().ok_or_else(|| { - exec_datafusion_err!("UInt16BitmapFilter: expected UInt16 array") + let v = v.as_primitive_opt::().ok_or_else(|| { + exec_datafusion_err!("BitmapFilter: expected {} array", C::DATA_TYPE_NAME) })?; let input_values = v.values(); Ok(build_in_list_result( @@ -373,7 +387,7 @@ mod tests { #[test] fn bitmap_filter_u8_handles_nulls() -> Result<()> { let haystack: ArrayRef = Arc::new(UInt8Array::from(vec![Some(1), None, Some(3)])); - let filter = UInt8BitmapFilter::try_new(&haystack)?; + let filter = BitmapFilter::::try_new(&haystack)?; let needles = UInt8Array::from(vec![Some(1), Some(2), None, Some(3)]); assert_contains(&filter, &needles, vec![Some(true), None, None, Some(true)])?; @@ -388,7 +402,7 @@ mod tests { #[test] fn bitmap_filter_u8_handles_dictionary_needles() -> Result<()> { let haystack: ArrayRef = Arc::new(UInt8Array::from(vec![Some(1), None, Some(3)])); - let filter = UInt8BitmapFilter::try_new(&haystack)?; + let filter = BitmapFilter::::try_new(&haystack)?; let keys = Int8Array::from(vec![Some(0), Some(1), None, Some(2)]); let values = Arc::new(UInt8Array::from(vec![Some(1), Some(2), Some(3)])); @@ -405,7 +419,7 @@ mod tests { Some(1024), Some(u16::MAX), ])); - let filter = UInt16BitmapFilter::try_new(&haystack)?; + let filter = BitmapFilter::::try_new(&haystack)?; let needles = UInt16Array::from(vec![Some(0), Some(1), Some(1024), Some(u16::MAX), None]); diff --git a/datafusion/physical-expr/src/expressions/in_list/strategy.rs b/datafusion/physical-expr/src/expressions/in_list/strategy.rs index aec94bddb920b..53adb4edb3e96 100644 --- a/datafusion/physical-expr/src/expressions/in_list/strategy.rs +++ b/datafusion/physical-expr/src/expressions/in_list/strategy.rs @@ -42,8 +42,12 @@ pub(super) fn instantiate_static_filter( DataType::Int16 => Ok(Arc::new(Int16StaticFilter::try_new(&in_array)?)), DataType::Int32 => Ok(Arc::new(Int32StaticFilter::try_new(&in_array)?)), DataType::Int64 => Ok(Arc::new(Int64StaticFilter::try_new(&in_array)?)), - DataType::UInt8 => Ok(Arc::new(UInt8BitmapFilter::try_new(&in_array)?)), - DataType::UInt16 => Ok(Arc::new(UInt16BitmapFilter::try_new(&in_array)?)), + DataType::UInt8 => Ok(Arc::new(BitmapFilter::::try_new( + &in_array, + )?)), + DataType::UInt16 => Ok(Arc::new(BitmapFilter::::try_new( + &in_array, + )?)), DataType::UInt32 => Ok(Arc::new(UInt32StaticFilter::try_new(&in_array)?)), DataType::UInt64 => Ok(Arc::new(UInt64StaticFilter::try_new(&in_array)?)), // Float primitive types (use ordered wrappers for Hash/Eq) From 1db56277b7b37dbd74e49a89ba46fe3a86e45045 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Fri, 9 Jan 2026 11:59:30 +0100 Subject: [PATCH 07/11] Implement Zero-Copy Reinterpretation and enable Int8/Int16 Bitmaps Introduces zero-copy buffer reinterpretation to allow signed integers and other 1 or 2-byte primitive types (e.g. Float16) to use the high-performance bitmap filters. Triggers for all types with 1-byte or 2-byte width. --- .../physical-expr/src/expressions/in_list.rs | 1 + .../expressions/in_list/primitive_filter.rs | 19 ++- .../src/expressions/in_list/strategy.rs | 16 +- .../src/expressions/in_list/transform.rs | 154 ++++++++++++++++++ 4 files changed, 178 insertions(+), 12 deletions(-) create mode 100644 datafusion/physical-expr/src/expressions/in_list/transform.rs diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 50ff3936937bf..be73b0a9d11be 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -41,6 +41,7 @@ mod primitive_filter; mod result; mod static_filter; mod strategy; +mod transform; use static_filter::StaticFilter; use strategy::instantiate_static_filter; diff --git a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs index de5df6ef06282..e71c0c18b6798 100644 --- a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs +++ b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs @@ -131,6 +131,22 @@ impl BitmapFilter { fn check(&self, needle: C::Native) -> bool { self.bits.get_bit(C::to_index(needle)) } + + /// Check membership using a raw values slice (zero-copy path for type reinterpretation). + #[inline] + pub(super) fn contains_slice( + &self, + values: &[C::Native], + nulls: Option<&NullBuffer>, + negated: bool, + ) -> BooleanArray { + build_in_list_result(values.len(), nulls, self.null_count > 0, negated, |i| { + // SAFETY: `build_in_list_result` invokes this closure for + // indices in `0..values.len()`. + let needle = unsafe { *values.get_unchecked(i) }; + self.check(needle) + }) + } } impl StaticFilter for BitmapFilter { @@ -345,9 +361,6 @@ macro_rules! primitive_static_filter { }; } -// Generate specialized filters for all integer primitive types -primitive_static_filter!(Int8StaticFilter, Int8Type); -primitive_static_filter!(Int16StaticFilter, Int16Type); primitive_static_filter!(Int32StaticFilter, Int32Type); primitive_static_filter!(Int64StaticFilter, Int64Type); primitive_static_filter!(UInt32StaticFilter, UInt32Type); diff --git a/datafusion/physical-expr/src/expressions/in_list/strategy.rs b/datafusion/physical-expr/src/expressions/in_list/strategy.rs index 53adb4edb3e96..942a7b15b6ca7 100644 --- a/datafusion/physical-expr/src/expressions/in_list/strategy.rs +++ b/datafusion/physical-expr/src/expressions/in_list/strategy.rs @@ -25,6 +25,7 @@ use datafusion_common::Result; use super::array_static_filter::ArrayStaticFilter; use super::primitive_filter::*; use super::static_filter::StaticFilter; +use super::transform::make_bitmap_filter; pub(super) fn instantiate_static_filter( in_array: ArrayRef, @@ -37,17 +38,14 @@ pub(super) fn instantiate_static_filter( _ => in_array, }; match in_array.data_type() { - // Integer primitive types - DataType::Int8 => Ok(Arc::new(Int8StaticFilter::try_new(&in_array)?)), - DataType::Int16 => Ok(Arc::new(Int16StaticFilter::try_new(&in_array)?)), + DataType::Int8 | DataType::UInt8 => { + make_bitmap_filter::(&in_array) + } + DataType::Int16 | DataType::UInt16 => { + make_bitmap_filter::(&in_array) + } DataType::Int32 => Ok(Arc::new(Int32StaticFilter::try_new(&in_array)?)), DataType::Int64 => Ok(Arc::new(Int64StaticFilter::try_new(&in_array)?)), - DataType::UInt8 => Ok(Arc::new(BitmapFilter::::try_new( - &in_array, - )?)), - DataType::UInt16 => Ok(Arc::new(BitmapFilter::::try_new( - &in_array, - )?)), DataType::UInt32 => Ok(Arc::new(UInt32StaticFilter::try_new(&in_array)?)), DataType::UInt64 => Ok(Arc::new(UInt64StaticFilter::try_new(&in_array)?)), // Float primitive types (use ordered wrappers for Hash/Eq) diff --git a/datafusion/physical-expr/src/expressions/in_list/transform.rs b/datafusion/physical-expr/src/expressions/in_list/transform.rs new file mode 100644 index 0000000000000..36ae944b15cb2 --- /dev/null +++ b/datafusion/physical-expr/src/expressions/in_list/transform.rs @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Type transformation utilities for InList filters. +//! +//! Some filters only depend on fixed-width value bit patterns. For those cases, +//! compatible primitive arrays can be reinterpreted to the filter's unsigned +//! storage type without copying values. + +use std::mem::size_of; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BooleanArray, PrimitiveArray}; +use arrow::buffer::ScalarBuffer; +use arrow::datatypes::ArrowPrimitiveType; +use datafusion_common::{Result, exec_datafusion_err}; + +use super::primitive_filter::{BitmapFilter, BitmapFilterConfig}; +use super::static_filter::{StaticFilter, handle_dictionary}; + +// ============================================================================= +// REINTERPRETING FILTERS (zero-copy type conversion) +// ============================================================================= + +/// Reinterpreting filter for bitmap lookups (u8/u16). +struct ReinterpretedBitmap { + inner: BitmapFilter, +} + +impl StaticFilter for ReinterpretedBitmap { + fn null_count(&self) -> usize { + self.inner.null_count() + } + + fn contains(&self, v: &dyn Array, negated: bool) -> Result { + handle_dictionary!(self, v, negated); + + if v.data_type().primitive_width() != Some(size_of::()) { + return Err(exec_datafusion_err!( + "BitmapFilter: expected {}-byte primitive array, got {}", + size_of::(), + v.data_type() + )); + } + + let data = v.to_data(); + let values: &[C::Native] = &data.buffer::(0)[..v.len()]; + + Ok(self.inner.contains_slice(values, data.nulls(), negated)) + } +} + +/// Reinterprets a same-width primitive array as the target primitive type `T`. +/// +/// This is a zero-copy operation: the returned array shares the original values +/// buffer and null buffer. Callers must ensure the source array and target type +/// have the same primitive width. +#[inline] +pub(crate) fn reinterpret_any_primitive_to( + array: &dyn Array, +) -> ArrayRef { + let data = array.to_data(); + let values = data.buffers()[0].clone(); + let buffer = ScalarBuffer::::new(values, data.offset(), data.len()); + Arc::new(PrimitiveArray::::new(buffer, array.nulls().cloned())) +} + +/// Creates a bitmap filter for u8/u16 types, reinterpreting if needed. +pub(crate) fn make_bitmap_filter( + in_array: &ArrayRef, +) -> Result> +where + C: BitmapFilterConfig, +{ + if in_array.data_type() == &C::ArrowType::DATA_TYPE { + return Ok(Arc::new(BitmapFilter::::try_new(in_array)?)); + } + if in_array.data_type().primitive_width() != Some(size_of::()) { + return Err(exec_datafusion_err!( + "BitmapFilter: expected {}-byte primitive array for {} bitmap, got {}", + size_of::(), + C::DATA_TYPE_NAME, + in_array.data_type() + )); + } + + let reinterpreted = reinterpret_any_primitive_to::(in_array.as_ref()); + let inner = BitmapFilter::::try_new(&reinterpreted)?; + Ok(Arc::new(ReinterpretedBitmap { inner })) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + use arrow::array::{ArrayRef, BooleanArray, Int8Array, Int16Array}; + + #[test] + fn reinterpreted_bitmap_handles_signed_boundaries_and_slices() -> Result<()> { + let haystack: ArrayRef = Arc::new( + Int8Array::from(vec![Some(99), Some(i8::MIN), None, Some(-1), Some(42)]) + .slice(1, 3), + ); + let filter = make_bitmap_filter::< + super::super::primitive_filter::UInt8BitmapConfig, + >(&haystack)?; + let needles = + Int8Array::from(vec![Some(7), Some(i8::MIN), Some(-1), None]).slice(1, 3); + + assert_eq!( + filter.contains(&needles, false)?, + BooleanArray::from(vec![Some(true), Some(true), None]) + ); + + let haystack: ArrayRef = Arc::new( + Int16Array::from(vec![ + Some(123), + Some(i16::MIN), + None, + Some(-1), + Some(i16::MAX), + ]) + .slice(1, 4), + ); + let filter = make_bitmap_filter::< + super::super::primitive_filter::UInt16BitmapConfig, + >(&haystack)?; + let needles = + Int16Array::from(vec![Some(0), Some(i16::MIN), Some(7), Some(i16::MAX)]) + .slice(1, 3); + + assert_eq!( + filter.contains(&needles, false)?, + BooleanArray::from(vec![Some(true), None, Some(true)]) + ); + + Ok(()) + } +} From d57ed3f5c04269a8709c26af460e78df9d76c845 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Fri, 9 Jan 2026 11:59:30 +0100 Subject: [PATCH 08/11] Implement Branchless Filter for small primitive lists Adds a const-generic unrolled comparison chain that avoids CPU branching. Outperforms hash lookups for very small lists. Triggers for primitives when list size <= 32 (4-byte), 16 (8-byte), or 4 (16-byte). --- .../expressions/in_list/primitive_filter.rs | 92 ++++++++++ .../src/expressions/in_list/strategy.rs | 117 ++++++++++--- .../src/expressions/in_list/transform.rs | 159 +++++++++++++++++- 3 files changed, 341 insertions(+), 27 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs index e71c0c18b6798..fb1d34bb0e5ec 100644 --- a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs +++ b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs @@ -176,6 +176,98 @@ impl StaticFilter for BitmapFilter { } } +/// A branchless filter for very small fixed-width primitive IN lists. +/// +/// Uses const generics to unroll the membership check into a fixed-size +/// comparison chain, outperforming hash lookups for small lists due to: +/// - No branching (uses bitwise OR to combine comparisons) +/// - Better CPU pipelining +/// - No hash computation overhead +pub(super) struct BranchlessFilter { + null_count: usize, + values: [T::Native; N], +} + +impl BranchlessFilter +where + T::Native: Copy + PartialEq, +{ + /// Try to create a branchless filter if the array has exactly N non-null values. + pub(super) fn try_new(in_array: &ArrayRef) -> Option> { + let in_array = in_array.as_primitive_opt::()?; + let non_null_count = in_array.len() - in_array.null_count(); + if non_null_count != N { + return None; + } + // Use default_value() from ArrowPrimitiveType trait instead of Default::default() + let mut arr = [T::default_value(); N]; + let mut i = 0; + for value in in_array.iter().flatten() { + arr[i] = value; + i += 1; + } + debug_assert_eq!(i, N); + Some(Ok(Self { + null_count: in_array.null_count(), + values: arr, + })) + } + + /// Branchless membership check using OR-chain. + #[inline(always)] + fn check(&self, needle: T::Native) -> bool { + self.values + .iter() + .fold(false, |acc, &v| acc | (v == needle)) + } + + /// Check membership using a raw values slice (zero-copy path for type reinterpretation). + #[inline] + pub(super) fn contains_slice( + &self, + values: &[T::Native], + nulls: Option<&NullBuffer>, + negated: bool, + ) -> BooleanArray { + build_in_list_result(values.len(), nulls, self.null_count > 0, negated, |i| { + // SAFETY: `build_in_list_result` invokes this closure for + // indices in `0..values.len()`. + let needle = unsafe { *values.get_unchecked(i) }; + self.check(needle) + }) + } +} + +impl StaticFilter for BranchlessFilter +where + T::Native: Copy + PartialEq + Send + Sync, +{ + fn null_count(&self) -> usize { + self.null_count + } + + fn contains(&self, v: &dyn Array, negated: bool) -> Result { + handle_dictionary!(self, v, negated); + let v = v.as_primitive_opt::().ok_or_else(|| { + exec_datafusion_err!("Failed to downcast array to primitive type") + })?; + let input_values = v.values(); + Ok(build_in_list_result( + v.len(), + v.nulls(), + self.null_count > 0, + negated, + #[inline(always)] + |i| { + // SAFETY: `build_in_list_result` invokes this closure for + // indices in `0..v.len()`, which matches `input_values.len()`. + let needle = unsafe { *input_values.get_unchecked(i) }; + self.check(needle) + }, + )) + } +} + /// Wrapper for f32 that implements Hash and Eq using bit comparison. /// This treats NaN values as equal to each other when they have the same bit pattern. #[derive(Clone, Copy)] diff --git a/datafusion/physical-expr/src/expressions/in_list/strategy.rs b/datafusion/physical-expr/src/expressions/in_list/strategy.rs index 942a7b15b6ca7..c99bf49f9e742 100644 --- a/datafusion/physical-expr/src/expressions/in_list/strategy.rs +++ b/datafusion/physical-expr/src/expressions/in_list/strategy.rs @@ -19,14 +19,66 @@ use std::sync::Arc; use arrow::array::ArrayRef; use arrow::compute::cast; -use arrow::datatypes::DataType; -use datafusion_common::Result; +use arrow::datatypes::*; +use datafusion_common::{Result, exec_datafusion_err}; use super::array_static_filter::ArrayStaticFilter; use super::primitive_filter::*; use super::static_filter::StaticFilter; -use super::transform::make_bitmap_filter; +use super::transform::{make_bitmap_filter, make_branchless_filter}; +/// Maximum list size for branchless lookup on 4-byte primitives (Int32, UInt32, Float32). +const BRANCHLESS_MAX_4B: usize = 32; + +/// Maximum list size for branchless lookup on 8-byte primitives (Int64, UInt64, Float64). +const BRANCHLESS_MAX_8B: usize = 16; + +/// Maximum list size for branchless lookup on 16-byte types (Decimal128). +const BRANCHLESS_MAX_16B: usize = 4; + +/// The lookup strategy to use for a given data type and list size. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum FilterStrategy { + /// Bitmap filter for u8/u16 domains. + Bitmap1B, + Bitmap2B, + /// Branchless OR-chain for small lists. + Branchless, + /// Generic ArrayStaticFilter fallback. + Generic, +} + +/// Selects the lookup strategy based on data type and list size. +fn select_strategy(dt: &DataType, len: usize) -> FilterStrategy { + match dt.primitive_width() { + Some(1) => FilterStrategy::Bitmap1B, + Some(2) => FilterStrategy::Bitmap2B, + Some(4) => { + if len <= BRANCHLESS_MAX_4B { + FilterStrategy::Branchless + } else { + FilterStrategy::Generic + } + } + Some(8) => { + if len <= BRANCHLESS_MAX_8B { + FilterStrategy::Branchless + } else { + FilterStrategy::Generic + } + } + Some(16) => { + if len <= BRANCHLESS_MAX_16B { + FilterStrategy::Branchless + } else { + FilterStrategy::Generic + } + } + _ => FilterStrategy::Generic, + } +} + +/// Creates the optimal static filter for the given array. pub(super) fn instantiate_static_filter( in_array: ArrayRef, ) -> Result> { @@ -37,23 +89,46 @@ pub(super) fn instantiate_static_filter( DataType::Dictionary(_, value_type) => cast(&in_array, value_type.as_ref())?, _ => in_array, }; - match in_array.data_type() { - DataType::Int8 | DataType::UInt8 => { - make_bitmap_filter::(&in_array) - } - DataType::Int16 | DataType::UInt16 => { - make_bitmap_filter::(&in_array) - } - DataType::Int32 => Ok(Arc::new(Int32StaticFilter::try_new(&in_array)?)), - DataType::Int64 => Ok(Arc::new(Int64StaticFilter::try_new(&in_array)?)), - DataType::UInt32 => Ok(Arc::new(UInt32StaticFilter::try_new(&in_array)?)), - DataType::UInt64 => Ok(Arc::new(UInt64StaticFilter::try_new(&in_array)?)), - // Float primitive types (use ordered wrappers for Hash/Eq) - DataType::Float32 => Ok(Arc::new(Float32StaticFilter::try_new(&in_array)?)), - DataType::Float64 => Ok(Arc::new(Float64StaticFilter::try_new(&in_array)?)), - _ => { - /* fall through to generic implementation for unsupported types (Struct, etc.) */ - Ok(Arc::new(ArrayStaticFilter::try_new(in_array)?)) - } + use FilterStrategy::*; + + let len = in_array.len(); + let dt = in_array.data_type(); + let strategy = select_strategy(dt, len); + + match (dt, strategy) { + // Bitmap filters for 1-byte and 2-byte types + (_, Bitmap1B) => make_bitmap_filter::(&in_array), + (_, Bitmap2B) => make_bitmap_filter::(&in_array), + + // Branchless filters for small lists of primitives + (_, Branchless) => dispatch_branchless(&in_array).ok_or_else(|| { + exec_datafusion_err!( + "Branchless strategy selected but no filter for {:?}", + dt + ) + })?, + + // Fallback for larger primitive lists or complex types. + (_, Generic) => match dt { + DataType::Int32 => Ok(Arc::new(Int32StaticFilter::try_new(&in_array)?)), + DataType::Int64 => Ok(Arc::new(Int64StaticFilter::try_new(&in_array)?)), + DataType::UInt32 => Ok(Arc::new(UInt32StaticFilter::try_new(&in_array)?)), + DataType::UInt64 => Ok(Arc::new(UInt64StaticFilter::try_new(&in_array)?)), + DataType::Float32 => Ok(Arc::new(Float32StaticFilter::try_new(&in_array)?)), + DataType::Float64 => Ok(Arc::new(Float64StaticFilter::try_new(&in_array)?)), + _ => Ok(Arc::new(ArrayStaticFilter::try_new(in_array)?)), + }, + } +} + +fn dispatch_branchless( + arr: &ArrayRef, +) -> Option>> { + // Dispatch to width-specific branchless filter. + match arr.data_type().primitive_width() { + Some(4) => Some(make_branchless_filter::(arr)), + Some(8) => Some(make_branchless_filter::(arr)), + Some(16) => Some(make_branchless_filter::(arr)), + _ => None, } } diff --git a/datafusion/physical-expr/src/expressions/in_list/transform.rs b/datafusion/physical-expr/src/expressions/in_list/transform.rs index 36ae944b15cb2..6b4a5523dc7f2 100644 --- a/datafusion/physical-expr/src/expressions/in_list/transform.rs +++ b/datafusion/physical-expr/src/expressions/in_list/transform.rs @@ -29,13 +29,9 @@ use arrow::buffer::ScalarBuffer; use arrow::datatypes::ArrowPrimitiveType; use datafusion_common::{Result, exec_datafusion_err}; -use super::primitive_filter::{BitmapFilter, BitmapFilterConfig}; +use super::primitive_filter::{BitmapFilter, BitmapFilterConfig, BranchlessFilter}; use super::static_filter::{StaticFilter, handle_dictionary}; -// ============================================================================= -// REINTERPRETING FILTERS (zero-copy type conversion) -// ============================================================================= - /// Reinterpreting filter for bitmap lookups (u8/u16). struct ReinterpretedBitmap { inner: BitmapFilter, @@ -64,6 +60,38 @@ impl StaticFilter for ReinterpretedBitmap { } } +/// Reinterpreting filter for branchless lookups. +struct ReinterpretedBranchless { + inner: BranchlessFilter, +} + +impl StaticFilter for ReinterpretedBranchless +where + T: ArrowPrimitiveType + 'static, + T::Native: Copy + PartialEq + Send + Sync + 'static, +{ + fn null_count(&self) -> usize { + self.inner.null_count() + } + + fn contains(&self, v: &dyn Array, negated: bool) -> Result { + handle_dictionary!(self, v, negated); + + if v.data_type().primitive_width() != Some(size_of::()) { + return Err(exec_datafusion_err!( + "BranchlessFilter: expected {}-byte primitive array, got {}", + size_of::(), + v.data_type() + )); + } + + let data = v.to_data(); + let values: &[T::Native] = &data.buffer::(0)[..v.len()]; + + Ok(self.inner.contains_slice(values, data.nulls(), negated)) + } +} + /// Reinterprets a same-width primitive array as the target primitive type `T`. /// /// This is a zero-copy operation: the returned array shares the original values @@ -103,12 +131,114 @@ where Ok(Arc::new(ReinterpretedBitmap { inner })) } +/// Creates a branchless filter for primitive types. +/// +/// Dispatches based on byte width and element count: +/// - 4-byte types (Int32, Float32, etc.): supports 0-32 elements +/// - 8-byte types (Int64, Float64, Timestamp, etc.): supports 0-16 elements +/// - 16-byte types (Decimal128): supports 0-4 elements +pub(crate) fn make_branchless_filter( + in_array: &ArrayRef, +) -> Result> +where + D: ArrowPrimitiveType + 'static, + D::Native: Copy + PartialEq + Send + Sync + 'static, +{ + let is_native = in_array.data_type() == &D::DATA_TYPE; + let width = size_of::(); + let arr = if is_native { + Arc::clone(in_array) + } else { + if in_array.data_type().primitive_width() != Some(width) { + return Err(exec_datafusion_err!( + "BranchlessFilter: expected {width}-byte primitive array, got {}", + in_array.data_type() + )); + } + reinterpret_any_primitive_to::(in_array.as_ref()) + }; + let n = arr.len() - arr.null_count(); + + // Helper to create the filter for a known size N + #[inline] + fn create( + arr: &ArrayRef, + is_native: bool, + ) -> Result> + where + D::Native: Copy + PartialEq + Send + Sync + 'static, + { + let inner = BranchlessFilter::::try_new(arr) + .expect("size verified") + .expect("type verified"); + if is_native { + Ok(Arc::new(inner)) + } else { + Ok(Arc::new(ReinterpretedBranchless { inner })) + } + } + + // Match on (width, count) - shared sizes use or-patterns to avoid duplication + match (width, n) { + // All widths: 0-4 + (4 | 8 | 16, 0) => create::(&arr, is_native), + (4 | 8 | 16, 1) => create::(&arr, is_native), + (4 | 8 | 16, 2) => create::(&arr, is_native), + (4 | 8 | 16, 3) => create::(&arr, is_native), + (4 | 8 | 16, 4) => create::(&arr, is_native), + // 4-byte and 8-byte: 5-16 + (4 | 8, 5) => create::(&arr, is_native), + (4 | 8, 6) => create::(&arr, is_native), + (4 | 8, 7) => create::(&arr, is_native), + (4 | 8, 8) => create::(&arr, is_native), + (4 | 8, 9) => create::(&arr, is_native), + (4 | 8, 10) => create::(&arr, is_native), + (4 | 8, 11) => create::(&arr, is_native), + (4 | 8, 12) => create::(&arr, is_native), + (4 | 8, 13) => create::(&arr, is_native), + (4 | 8, 14) => create::(&arr, is_native), + (4 | 8, 15) => create::(&arr, is_native), + (4 | 8, 16) => create::(&arr, is_native), + // 4-byte only: 17-32 + (4, 17) => create::(&arr, is_native), + (4, 18) => create::(&arr, is_native), + (4, 19) => create::(&arr, is_native), + (4, 20) => create::(&arr, is_native), + (4, 21) => create::(&arr, is_native), + (4, 22) => create::(&arr, is_native), + (4, 23) => create::(&arr, is_native), + (4, 24) => create::(&arr, is_native), + (4, 25) => create::(&arr, is_native), + (4, 26) => create::(&arr, is_native), + (4, 27) => create::(&arr, is_native), + (4, 28) => create::(&arr, is_native), + (4, 29) => create::(&arr, is_native), + (4, 30) => create::(&arr, is_native), + (4, 31) => create::(&arr, is_native), + (4, 32) => create::(&arr, is_native), + // Error cases + (4, n) => datafusion_common::exec_err!( + "Branchless filter for 4-byte types supports 0-32 elements, got {n}" + ), + (8, n) => datafusion_common::exec_err!( + "Branchless filter for 8-byte types supports 0-16 elements, got {n}" + ), + (16, n) => datafusion_common::exec_err!( + "Branchless filter for 16-byte types supports 0-4 elements, got {n}" + ), + (w, _) => datafusion_common::exec_err!( + "Branchless filter not supported for {w}-byte types" + ), + } +} + #[cfg(test)] mod tests { use super::*; use std::sync::Arc; - use arrow::array::{ArrayRef, BooleanArray, Int8Array, Int16Array}; + use arrow::array::{ArrayRef, BooleanArray, Int8Array, Int16Array, Int32Array}; + use arrow::datatypes::UInt32Type; #[test] fn reinterpreted_bitmap_handles_signed_boundaries_and_slices() -> Result<()> { @@ -151,4 +281,21 @@ mod tests { Ok(()) } + + #[test] + fn reinterpreted_branchless_handles_slices() -> Result<()> { + let haystack: ArrayRef = Arc::new( + Int32Array::from(vec![Some(99), Some(-7), None, Some(42)]).slice(1, 3), + ); + let filter = make_branchless_filter::(&haystack)?; + let needles = + Int32Array::from(vec![Some(0), Some(-7), Some(1), Some(42)]).slice(1, 3); + + assert_eq!( + filter.contains(&needles, false)?, + BooleanArray::from(vec![Some(true), None, Some(true)]) + ); + + Ok(()) + } } From 2e20173e30789281efd8b98986823d08299e59ef Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Fri, 9 Jan 2026 11:59:30 +0100 Subject: [PATCH 09/11] Implement Direct Probe (Hash) Filter for large primitive lists Implements a fast hash table using open addressing with linear probing and a 25% load factor. Replaces the legacy HashSet for primitives, reducing indirection. Triggers for primitives when list size exceeds branchless thresholds. --- .../expressions/in_list/primitive_filter.rs | 383 ++++++++++-------- .../src/expressions/in_list/strategy.rs | 171 +++++++- 2 files changed, 359 insertions(+), 195 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs index fb1d34bb0e5ec..8374e9141c5a2 100644 --- a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs +++ b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs @@ -20,10 +20,8 @@ //! This module provides membership tests for Arrow primitive types. use arrow::array::{Array, ArrayRef, AsArray, BooleanArray}; -use arrow::buffer::{BooleanBuffer, NullBuffer}; -use arrow::datatypes::*; +use arrow::datatypes::ArrowPrimitiveType; use datafusion_common::{HashSet, Result, exec_datafusion_err}; -use std::hash::{Hash, Hasher}; use super::result::build_in_list_result; use super::static_filter::{StaticFilter, handle_dictionary}; @@ -67,7 +65,7 @@ impl BitmapStorage for Box<[u64; 1024]> { pub(super) trait BitmapFilterConfig: Send + Sync + 'static { const DATA_TYPE_NAME: &'static str; - type Native: ArrowNativeType + Copy + Send + Sync; + type Native: arrow::datatypes::ArrowNativeType + Copy + Send + Sync; type ArrowType: ArrowPrimitiveType; type Storage: BitmapStorage; @@ -79,7 +77,7 @@ impl BitmapFilterConfig for UInt8BitmapConfig { const DATA_TYPE_NAME: &'static str = "UInt8"; type Native = u8; - type ArrowType = UInt8Type; + type ArrowType = arrow::datatypes::UInt8Type; type Storage = [u64; 4]; #[inline(always)] @@ -93,7 +91,7 @@ impl BitmapFilterConfig for UInt16BitmapConfig { const DATA_TYPE_NAME: &'static str = "UInt16"; type Native = u16; - type ArrowType = UInt16Type; + type ArrowType = arrow::datatypes::UInt16Type; type Storage = Box<[u64; 1024]>; #[inline(always)] @@ -137,7 +135,7 @@ impl BitmapFilter { pub(super) fn contains_slice( &self, values: &[C::Native], - nulls: Option<&NullBuffer>, + nulls: Option<&arrow::buffer::NullBuffer>, negated: bool, ) -> BooleanArray { build_in_list_result(values.len(), nulls, self.null_count > 0, negated, |i| { @@ -226,7 +224,7 @@ where pub(super) fn contains_slice( &self, values: &[T::Native], - nulls: Option<&NullBuffer>, + nulls: Option<&arrow::buffer::NullBuffer>, negated: bool, ) -> BooleanArray { build_in_list_result(values.len(), nulls, self.null_count > 0, negated, |i| { @@ -268,214 +266,219 @@ where } } -/// Wrapper for f32 that implements Hash and Eq using bit comparison. -/// This treats NaN values as equal to each other when they have the same bit pattern. -#[derive(Clone, Copy)] -struct OrderedFloat32(f32); +/// Load factor inverse for DirectProbeFilter hash table. +/// A value of 4 means 25% load factor (table is 4x the number of elements). +const LOAD_FACTOR_INVERSE: usize = 4; -impl Hash for OrderedFloat32 { - fn hash(&self, state: &mut H) { - self.0.to_ne_bytes().hash(state); - } -} +/// Minimum table size for DirectProbeFilter. +/// Ensures reasonable performance even for very small IN lists. +const MIN_TABLE_SIZE: usize = 16; -impl PartialEq for OrderedFloat32 { - fn eq(&self, other: &Self) -> bool { - self.0.to_bits() == other.0.to_bits() - } +/// Golden ratio constant for 32-bit hash mixing. +/// Derived from (2^32 / phi) where phi = (1 + sqrt(5)) / 2. +const GOLDEN_RATIO_32: u32 = 0x9e3779b9; + +/// Golden ratio constant for 64-bit hash mixing. +/// Derived from (2^64 / phi) where phi = (1 + sqrt(5)) / 2. +const GOLDEN_RATIO_64: u64 = 0x9e3779b97f4a7c15; + +/// Secondary mixing constant for 128-bit hashing (from SplitMix64). +/// Using a different constant for hi/lo avoids collisions when lo = hi * C. +const SPLITMIX_CONSTANT: u64 = 0xbf58476d1ce4e5b9; + +/// Fast hash filter using open addressing with linear probing. +/// +/// Uses a power-of-2 sized hash table for O(1) average-case lookups. +/// Optimized for the IN list use case with: +/// - Simple/fast hash function (golden ratio multiply + xor-shift) +/// - 25% load factor for minimal collisions +/// - Direct array storage for cache-friendly access +pub(super) struct DirectProbeFilter +where + T::Native: DirectProbeHashable, +{ + null_count: usize, + /// Hash table with open addressing. None = empty slot, Some(v) = value present + table: Box<[Option]>, + /// Mask for slot index (table.len() - 1, always power of 2 minus 1) + mask: usize, } -impl Eq for OrderedFloat32 {} +/// Trait for types that can be hashed for the direct probe filter. +/// +/// Requires `Hash + Eq` for deduplication via `HashSet`, even though we use +/// a custom `probe_hash()` for the actual hash table lookups. +pub(super) trait DirectProbeHashable: + Copy + PartialEq + std::hash::Hash + Eq +{ + fn probe_hash(self) -> usize; +} -impl From for OrderedFloat32 { - fn from(v: f32) -> Self { - Self(v) +// Simple but fast hash - golden ratio multiply + xor-shift +impl DirectProbeHashable for i32 { + #[inline(always)] + fn probe_hash(self) -> usize { + let x = self as u32; + let x = x.wrapping_mul(GOLDEN_RATIO_32); + (x ^ (x >> 16)) as usize } } -/// Wrapper for f64 that implements Hash and Eq using bit comparison. -/// This treats NaN values as equal to each other when they have the same bit pattern. -#[derive(Clone, Copy)] -struct OrderedFloat64(f64); - -impl Hash for OrderedFloat64 { - fn hash(&self, state: &mut H) { - self.0.to_ne_bytes().hash(state); +impl DirectProbeHashable for i64 { + #[inline(always)] + fn probe_hash(self) -> usize { + let x = self as u64; + let x = x.wrapping_mul(GOLDEN_RATIO_64); + (x ^ (x >> 32)) as usize } } -impl PartialEq for OrderedFloat64 { - fn eq(&self, other: &Self) -> bool { - self.0.to_bits() == other.0.to_bits() +impl DirectProbeHashable for u32 { + #[inline(always)] + fn probe_hash(self) -> usize { + (self as i32).probe_hash() } } -impl Eq for OrderedFloat64 {} +impl DirectProbeHashable for u64 { + #[inline(always)] + fn probe_hash(self) -> usize { + (self as i64).probe_hash() + } +} -impl From for OrderedFloat64 { - fn from(v: f64) -> Self { - Self(v) +impl DirectProbeHashable for i128 { + #[inline(always)] + fn probe_hash(self) -> usize { + // Mix both halves with different constants to avoid collisions when lo = hi * C + let lo = self as u64; + let hi = (self >> 64) as u64; + let x = lo.wrapping_mul(GOLDEN_RATIO_64) ^ hi.wrapping_mul(SPLITMIX_CONSTANT); + (x ^ (x >> 32)) as usize } } -// Macro to generate specialized StaticFilter implementations for primitive types -macro_rules! primitive_static_filter { - ($Name:ident, $ArrowType:ty) => { - primitive_static_filter!( - $Name, - $ArrowType, - <$ArrowType as ArrowPrimitiveType>::Native, - |v| v - ); - }; - ($Name:ident, $ArrowType:ty, $SetValueType:ty, $to_set_value:expr) => { - pub(super) struct $Name { - null_count: usize, - values: HashSet<$SetValueType>, - } +impl DirectProbeFilter +where + T::Native: DirectProbeHashable, +{ + pub(super) fn try_new(in_array: &ArrayRef) -> Result { + let arr = in_array.as_primitive_opt::().ok_or_else(|| { + exec_datafusion_err!( + "DirectProbeFilter: expected {} array", + std::any::type_name::() + ) + })?; - impl $Name { - pub(super) fn try_new(in_array: &ArrayRef) -> Result { - let in_array = - in_array.as_primitive_opt::<$ArrowType>().ok_or_else(|| { - exec_datafusion_err!( - "Failed to downcast an array to a '{}' array", - stringify!($ArrowType) - ) - })?; - - let mut values = HashSet::with_capacity(in_array.len()); - let null_count = in_array.null_count(); - - for v in in_array.iter().flatten() { - values.insert(($to_set_value)(v)); - } + // Collect unique values using HashSet for deduplication + let unique_values: HashSet<_> = arr.iter().flatten().collect(); + + Ok(Self::from_unique_values(unique_values, arr.null_count())) + } - Ok(Self { null_count, values }) + fn from_unique_values(unique_values: HashSet, null_count: usize) -> Self { + // Size table to ~25% load factor for fewer collisions + let n = unique_values.len().max(1); + let table_size = (n * LOAD_FACTOR_INVERSE) + .next_power_of_two() + .max(MIN_TABLE_SIZE); + let mask = table_size - 1; + + let mut table: Box<[Option]> = + vec![None; table_size].into_boxed_slice(); + + // Insert all values using linear probing + for v in unique_values { + let mut slot = v.probe_hash() & mask; + loop { + if table[slot].is_none() { + table[slot] = Some(v); + break; + } + slot = (slot + 1) & mask; } } - impl StaticFilter for $Name { - fn null_count(&self) -> usize { - self.null_count - } + Self { + null_count, + table, + mask, + } + } - fn contains(&self, v: &dyn Array, negated: bool) -> Result { - handle_dictionary!(self, v, negated); - - let v = v.as_primitive_opt::<$ArrowType>().ok_or_else(|| { - exec_datafusion_err!( - "Failed to downcast an array to a '{}' array", - stringify!($ArrowType) - ) - })?; - - let haystack_has_nulls = self.null_count > 0; - let needle_values = v.values(); - let needle_nulls = v.nulls(); - let needle_has_nulls = v.null_count() > 0; - - // Truth table for `value [NOT] IN (set)` with SQL three-valued logic: - // ("-" means the value doesn't affect the result) - // - // | needle_null | haystack_null | negated | in set? | result | - // |-------------|---------------|---------|---------|--------| - // | true | - | false | - | null | - // | true | - | true | - | null | - // | false | true | false | yes | true | - // | false | true | false | no | null | - // | false | true | true | yes | false | - // | false | true | true | no | null | - // | false | false | false | yes | true | - // | false | false | false | no | false | - // | false | false | true | yes | false | - // | false | false | true | no | true | - - // Compute the "contains" result using collect_bool (fast batched approach) - // This ignores nulls - we handle them separately - let contains_buffer = if negated { - BooleanBuffer::collect_bool(needle_values.len(), |i| { - !self.values.contains(&($to_set_value)(needle_values[i])) - }) - } else { - BooleanBuffer::collect_bool(needle_values.len(), |i| { - self.values.contains(&($to_set_value)(needle_values[i])) - }) - }; - - // Compute the null mask - // Output is null when: - // 1. needle value is null, OR - // 2. needle value is not in set AND haystack has nulls - let result_nulls = match (needle_has_nulls, haystack_has_nulls) { - (false, false) => { - // No nulls anywhere - None - } - (true, false) => { - // Only needle has nulls - just use needle's null mask - needle_nulls.cloned() - } - (false, true) => { - // Only haystack has nulls - result is null when value not in set - // Valid (not null) when original "in set" is true - // For NOT IN: contains_buffer = !original, so validity = !contains_buffer - let validity = if negated { - !&contains_buffer - } else { - contains_buffer.clone() - }; - Some(NullBuffer::new(validity)) - } - (true, true) => { - // Both have nulls - combine needle nulls with haystack-induced nulls - let needle_validity = - needle_nulls.map(|n| n.inner().clone()).unwrap_or_else( - || BooleanBuffer::new_set(needle_values.len()), - ); - - // Valid when original "in set" is true (see above) - let haystack_validity = if negated { - !&contains_buffer - } else { - contains_buffer.clone() - }; - - // Combined validity: valid only where both are valid - let combined_validity = &needle_validity & &haystack_validity; - Some(NullBuffer::new(combined_validity)) - } - }; - - Ok(BooleanArray::new(contains_buffer, result_nulls)) + /// O(1) single-value lookup with linear probing. + /// + /// Returns true if the value is in the set. + #[inline(always)] + fn contains_single(&self, needle: T::Native) -> bool { + let mut slot = needle.probe_hash() & self.mask; + loop { + // SAFETY: `slot` is always < table.len() because: + // - `slot = hash & mask` where `mask = table.len() - 1` + // - table size is always a power of 2 + // - `(slot + 1) & mask` wraps around within bounds + match unsafe { self.table.get_unchecked(slot) } { + None => return false, + Some(v) if *v == needle => return true, + _ => slot = (slot + 1) & self.mask, } } - }; + } + + /// Check membership using a raw values slice + #[inline] + pub(super) fn contains_slice( + &self, + input: &[T::Native], + nulls: Option<&arrow::buffer::NullBuffer>, + negated: bool, + ) -> BooleanArray { + build_in_list_result(input.len(), nulls, self.null_count > 0, negated, |i| { + // SAFETY: `build_in_list_result` invokes this closure for + // indices in `0..input.len()`. + let needle = unsafe { *input.get_unchecked(i) }; + self.contains_single(needle) + }) + } } -primitive_static_filter!(Int32StaticFilter, Int32Type); -primitive_static_filter!(Int64StaticFilter, Int64Type); -primitive_static_filter!(UInt32StaticFilter, UInt32Type); -primitive_static_filter!(UInt64StaticFilter, UInt64Type); +impl StaticFilter for DirectProbeFilter +where + T: ArrowPrimitiveType + 'static, + T::Native: DirectProbeHashable + Send + Sync + 'static, +{ + #[inline] + fn null_count(&self) -> usize { + self.null_count + } -// Macro to generate specialized StaticFilter implementations for float types -// Floats require a wrapper type (OrderedFloat*) to implement Hash/Eq due to NaN semantics -macro_rules! float_static_filter { - ($Name:ident, $ArrowType:ty, $OrderedType:ty) => { - primitive_static_filter!($Name, $ArrowType, $OrderedType, <$OrderedType>::from); - }; -} + #[inline] + fn contains(&self, v: &dyn Array, negated: bool) -> Result { + handle_dictionary!(self, v, negated); + if v.data_type() != &T::DATA_TYPE { + return Err(exec_datafusion_err!( + "DirectProbeFilter: expected {} array, got {}", + T::DATA_TYPE, + v.data_type() + )); + } -// Generate specialized filters for float types using ordered wrappers -float_static_filter!(Float32StaticFilter, Float32Type, OrderedFloat32); -float_static_filter!(Float64StaticFilter, Float64Type, OrderedFloat64); + // Use raw buffer access for better optimization + let data = v.to_data(); + let values: &[T::Native] = &data.buffer::(0)[..v.len()]; + Ok(self.contains_slice(values, v.nulls(), negated)) + } +} #[cfg(test)] mod tests { use super::*; use std::sync::Arc; - use arrow::array::{DictionaryArray, Int8Array, UInt8Array, UInt16Array}; + use arrow::array::{ + DictionaryArray, Int8Array, UInt8Array, UInt16Array, UInt32Array, + }; + use arrow::datatypes::UInt32Type; fn assert_contains( filter: &dyn StaticFilter, @@ -540,4 +543,24 @@ mod tests { Ok(()) } + + #[test] + fn direct_probe_filter_handles_slices_and_nulls() -> Result<()> { + let haystack: ArrayRef = Arc::new( + UInt32Array::from(vec![Some(999), Some(10), None, Some(20), Some(30)]) + .slice(1, 4), + ); + let filter = DirectProbeFilter::::try_new(&haystack)?; + let needles = + UInt32Array::from(vec![Some(0), Some(10), Some(11), Some(30), None]) + .slice(1, 4); + + assert_contains(&filter, &needles, vec![Some(true), None, Some(true), None])?; + assert_eq!( + filter.contains(&needles, true)?, + BooleanArray::from(vec![Some(false), None, Some(false), None]) + ); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/in_list/strategy.rs b/datafusion/physical-expr/src/expressions/in_list/strategy.rs index c99bf49f9e742..48bb16004e5f9 100644 --- a/datafusion/physical-expr/src/expressions/in_list/strategy.rs +++ b/datafusion/physical-expr/src/expressions/in_list/strategy.rs @@ -24,8 +24,10 @@ use datafusion_common::{Result, exec_datafusion_err}; use super::array_static_filter::ArrayStaticFilter; use super::primitive_filter::*; -use super::static_filter::StaticFilter; -use super::transform::{make_bitmap_filter, make_branchless_filter}; +use super::static_filter::{StaticFilter, handle_dictionary}; +use super::transform::{ + make_bitmap_filter, make_branchless_filter, reinterpret_any_primitive_to, +}; /// Maximum list size for branchless lookup on 4-byte primitives (Int32, UInt32, Float32). const BRANCHLESS_MAX_4B: usize = 32; @@ -44,6 +46,8 @@ enum FilterStrategy { Bitmap2B, /// Branchless OR-chain for small lists. Branchless, + /// Direct-probe hash table for larger primitive lists. + Hashed, /// Generic ArrayStaticFilter fallback. Generic, } @@ -57,21 +61,21 @@ fn select_strategy(dt: &DataType, len: usize) -> FilterStrategy { if len <= BRANCHLESS_MAX_4B { FilterStrategy::Branchless } else { - FilterStrategy::Generic + FilterStrategy::Hashed } } Some(8) => { if len <= BRANCHLESS_MAX_8B { FilterStrategy::Branchless } else { - FilterStrategy::Generic + FilterStrategy::Hashed } } Some(16) => { if len <= BRANCHLESS_MAX_16B { FilterStrategy::Branchless } else { - FilterStrategy::Generic + FilterStrategy::Hashed } } _ => FilterStrategy::Generic, @@ -108,16 +112,13 @@ pub(super) fn instantiate_static_filter( ) })?, - // Fallback for larger primitive lists or complex types. - (_, Generic) => match dt { - DataType::Int32 => Ok(Arc::new(Int32StaticFilter::try_new(&in_array)?)), - DataType::Int64 => Ok(Arc::new(Int64StaticFilter::try_new(&in_array)?)), - DataType::UInt32 => Ok(Arc::new(UInt32StaticFilter::try_new(&in_array)?)), - DataType::UInt64 => Ok(Arc::new(UInt64StaticFilter::try_new(&in_array)?)), - DataType::Float32 => Ok(Arc::new(Float32StaticFilter::try_new(&in_array)?)), - DataType::Float64 => Ok(Arc::new(Float64StaticFilter::try_new(&in_array)?)), - _ => Ok(Arc::new(ArrayStaticFilter::try_new(in_array)?)), - }, + // Hash filters for larger lists of primitives. + (_, Hashed) => dispatch_hashed(&in_array).ok_or_else(|| { + exec_datafusion_err!("Hashed strategy selected but no filter for {:?}", dt) + })?, + + // Fallback for nested/complex types and strings. + (_, Generic) => Ok(Arc::new(ArrayStaticFilter::try_new(in_array)?)), } } @@ -132,3 +133,143 @@ fn dispatch_branchless( _ => None, } } + +fn dispatch_hashed( + arr: &ArrayRef, +) -> Option>> { + macro_rules! direct_probe_filter { + ($T:ty) => { + return Some( + DirectProbeFilter::<$T>::try_new(arr) + .map(|f| Arc::new(f) as Arc), + ) + }; + } + match arr.data_type() { + DataType::Int32 => direct_probe_filter!(Int32Type), + DataType::Int64 => direct_probe_filter!(Int64Type), + DataType::UInt32 => direct_probe_filter!(UInt32Type), + DataType::UInt64 => direct_probe_filter!(UInt64Type), + _ => {} + } + + // For other primitive types, reinterpret same-width bit patterns. + match arr.data_type().primitive_width() { + Some(4) => Some(make_direct_probe_filter_reinterpreted::(arr)), + Some(8) => Some(make_direct_probe_filter_reinterpreted::(arr)), + Some(16) => Some(make_direct_probe_filter_reinterpreted::( + arr, + )), + _ => None, + } +} + +/// Creates a DirectProbeFilter with type reinterpretation for same-width primitive types. +fn make_direct_probe_filter_reinterpreted( + in_array: &ArrayRef, +) -> Result> +where + D: ArrowPrimitiveType + 'static, + D::Native: Send + Sync + DirectProbeHashable + 'static, +{ + if in_array.data_type() == &D::DATA_TYPE { + return Ok(Arc::new(DirectProbeFilter::::try_new(in_array)?)); + } + if in_array.data_type().primitive_width() != Some(size_of::()) { + return Err(exec_datafusion_err!( + "DirectProbeFilter: expected {}-byte primitive array, got {}", + size_of::(), + in_array.data_type() + )); + } + + let reinterpreted = reinterpret_any_primitive_to::(in_array.as_ref()); + let inner = DirectProbeFilter::::try_new(&reinterpreted)?; + Ok(Arc::new(ReinterpretedDirectProbeFilter { inner })) +} + +/// Wrapper for DirectProbeFilter with type reinterpretation. +struct ReinterpretedDirectProbeFilter +where + D::Native: DirectProbeHashable, +{ + inner: DirectProbeFilter, +} + +impl StaticFilter for ReinterpretedDirectProbeFilter +where + D: ArrowPrimitiveType + 'static, + D::Native: Send + Sync + DirectProbeHashable + 'static, +{ + #[inline] + fn null_count(&self) -> usize { + self.inner.null_count() + } + + #[inline] + fn contains( + &self, + v: &dyn arrow::array::Array, + negated: bool, + ) -> Result { + handle_dictionary!(self, v, negated); + if v.data_type().primitive_width() != Some(size_of::()) { + return Err(exec_datafusion_err!( + "DirectProbeFilter: expected {}-byte primitive array, got {}", + size_of::(), + v.data_type() + )); + } + + let data = v.to_data(); + let values: &[D::Native] = &data.buffer::(0)[..v.len()]; + Ok(self.inner.contains_slice(values, v.nulls(), negated)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use arrow::array::{ArrayRef, BooleanArray, Float64Array}; + + #[test] + fn direct_probe_strategy_handles_reinterpreted_slices() -> Result<()> { + let haystack: ArrayRef = Arc::new( + Float64Array::from(vec![ + Some(999.0), + Some(0.0), + Some(1.0), + None, + Some(f64::NAN), + Some(4.0), + Some(5.0), + Some(6.0), + Some(7.0), + Some(8.0), + Some(9.0), + Some(10.0), + Some(11.0), + Some(12.0), + Some(13.0), + Some(14.0), + Some(15.0), + Some(16.0), + Some(17.0), + Some(1234.0), + ]) + .slice(1, 18), + ); + let filter = instantiate_static_filter(haystack)?; + let needles = + Float64Array::from(vec![Some(999.0), Some(1.0), Some(3.0), Some(f64::NAN)]) + .slice(1, 3); + + assert_eq!( + filter.contains(&needles, false)?, + BooleanArray::from(vec![Some(true), None, Some(true)]) + ); + + Ok(()) + } +} From 541d7ec5c2340d1f000a774e72705626e452a394 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Fri, 9 Jan 2026 11:59:30 +0100 Subject: [PATCH 10/11] Implement String View (Utf8View/BinaryView) Optimizations Introduces a two-stage filter for ByteView types. Stage 1 uses a fast DirectProbeFilter on masked views (len + prefix) for quick rejection; Stage 2 performs full verification only for potential long-string matches. Triggers for Utf8View and BinaryView. --- .../expressions/in_list/primitive_filter.rs | 8 +- .../src/expressions/in_list/result.rs | 63 +++ .../src/expressions/in_list/strategy.rs | 22 +- .../src/expressions/in_list/transform.rs | 374 +++++++++++++++++- 4 files changed, 460 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs index 8374e9141c5a2..a6f7639f691ec 100644 --- a/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs +++ b/datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs @@ -376,6 +376,12 @@ where Ok(Self::from_unique_values(unique_values, arr.null_count())) } + /// Creates a DirectProbeFilter from pre-processed values. + pub(super) fn from_values(values: impl Iterator) -> Self { + let unique_values: HashSet<_> = values.collect(); + Self::from_unique_values(unique_values, 0) + } + fn from_unique_values(unique_values: HashSet, null_count: usize) -> Self { // Size table to ~25% load factor for fewer collisions let n = unique_values.len().max(1); @@ -410,7 +416,7 @@ where /// /// Returns true if the value is in the set. #[inline(always)] - fn contains_single(&self, needle: T::Native) -> bool { + pub(super) fn contains_single(&self, needle: T::Native) -> bool { let mut slot = needle.probe_hash() & self.mask; loop { // SAFETY: `slot` is always < table.len() because: diff --git a/datafusion/physical-expr/src/expressions/in_list/result.rs b/datafusion/physical-expr/src/expressions/in_list/result.rs index 3ebdbfe19f743..55bce1139fea0 100644 --- a/datafusion/physical-expr/src/expressions/in_list/result.rs +++ b/datafusion/physical-expr/src/expressions/in_list/result.rs @@ -58,6 +58,69 @@ where build_result_from_contains(needle_nulls, haystack_has_nulls, negated, contains_buf) } +/// Builds a BooleanArray result while skipping contains checks for null needles. +/// +/// This is useful when `contains` is expensive, for example when a string path +/// may need hashing and full byte comparison. If there are no actual nulls, it +/// falls back to the same branch-free contains collection as +/// `build_in_list_result`. +#[inline] +pub(crate) fn build_in_list_result_with_null_shortcircuit( + len: usize, + needle_nulls: Option<&NullBuffer>, + needle_null_count: usize, + haystack_has_nulls: bool, + negated: bool, + mut contains: C, +) -> BooleanArray +where + C: FnMut(usize) -> bool, +{ + let effective_nulls = needle_nulls.filter(|_| needle_null_count > 0); + + let contains_buf = match effective_nulls { + Some(nulls) => { + BooleanBuffer::collect_bool(len, |i| nulls.is_valid(i) && contains(i)) + } + None => BooleanBuffer::collect_bool(len, contains), + }; + + build_result_from_premasked_contains( + effective_nulls, + haystack_has_nulls, + negated, + contains_buf, + ) +} + +/// Builds a result from a contains buffer that is already false at null needles. +#[inline] +fn build_result_from_premasked_contains( + needle_nulls: Option<&NullBuffer>, + haystack_has_nulls: bool, + negated: bool, + contains_buf: BooleanBuffer, +) -> BooleanArray { + match (needle_nulls, haystack_has_nulls, negated) { + (_, true, false) => { + BooleanArray::new(contains_buf.clone(), Some(NullBuffer::new(contains_buf))) + } + (Some(v), true, true) => BooleanArray::new( + v.inner() ^ &contains_buf, + Some(NullBuffer::new(contains_buf)), + ), + (None, true, true) => { + BooleanArray::new(!&contains_buf, Some(NullBuffer::new(contains_buf))) + } + (Some(v), false, false) => BooleanArray::new(contains_buf, Some(v.clone())), + (Some(v), false, true) => { + BooleanArray::new(v.inner() & &(!&contains_buf), Some(v.clone())) + } + (None, false, false) => BooleanArray::new(contains_buf, None), + (None, false, true) => BooleanArray::new(!&contains_buf, None), + } +} + /// Builds a BooleanArray result from a pre-computed contains buffer. /// /// This version does not assume contains_buf is pre-masked at null positions. diff --git a/datafusion/physical-expr/src/expressions/in_list/strategy.rs b/datafusion/physical-expr/src/expressions/in_list/strategy.rs index 48bb16004e5f9..0ad8523385df9 100644 --- a/datafusion/physical-expr/src/expressions/in_list/strategy.rs +++ b/datafusion/physical-expr/src/expressions/in_list/strategy.rs @@ -26,7 +26,9 @@ use super::array_static_filter::ArrayStaticFilter; use super::primitive_filter::*; use super::static_filter::{StaticFilter, handle_dictionary}; use super::transform::{ - make_bitmap_filter, make_branchless_filter, reinterpret_any_primitive_to, + make_bitmap_filter, make_branchless_filter, make_byte_view_masked_filter, + make_utf8view_branchless_filter, make_utf8view_hash_filter, + reinterpret_any_primitive_to, utf8view_all_short_strings, }; /// Maximum list size for branchless lookup on 4-byte primitives (Int32, UInt32, Float32). @@ -97,6 +99,16 @@ pub(super) fn instantiate_static_filter( let len = in_array.len(); let dt = in_array.data_type(); + + // Special case: Utf8View with short strings can be reinterpreted as i128 + if matches!(dt, DataType::Utf8View) && utf8view_all_short_strings(in_array.as_ref()) { + return if len <= BRANCHLESS_MAX_16B { + make_utf8view_branchless_filter(&in_array) + } else { + make_utf8view_hash_filter(&in_array) + }; + } + let strategy = select_strategy(dt, len); match (dt, strategy) { @@ -117,6 +129,14 @@ pub(super) fn instantiate_static_filter( exec_datafusion_err!("Hashed strategy selected but no filter for {:?}", dt) })?, + // Byte view filters (Utf8View, BinaryView) + (DataType::Utf8View, Generic) => { + make_byte_view_masked_filter::(in_array) + } + (DataType::BinaryView, Generic) => { + make_byte_view_masked_filter::(in_array) + } + // Fallback for nested/complex types and strings. (_, Generic) => Ok(Arc::new(ArrayStaticFilter::try_new(in_array)?)), } diff --git a/datafusion/physical-expr/src/expressions/in_list/transform.rs b/datafusion/physical-expr/src/expressions/in_list/transform.rs index 6b4a5523dc7f2..d20215890283f 100644 --- a/datafusion/physical-expr/src/expressions/in_list/transform.rs +++ b/datafusion/physical-expr/src/expressions/in_list/transform.rs @@ -21,17 +21,34 @@ //! compatible primitive arrays can be reinterpreted to the filter's unsigned //! storage type without copying values. +use std::hash::BuildHasher; +use std::marker::PhantomData; use std::mem::size_of; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, BooleanArray, PrimitiveArray}; +use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, PrimitiveArray}; use arrow::buffer::ScalarBuffer; -use arrow::datatypes::ArrowPrimitiveType; +use arrow::datatypes::{ArrowPrimitiveType, ByteViewType, Decimal128Type}; +use arrow::util::bit_iterator::BitIndexIterator; +use datafusion_common::hash_utils::RandomState; use datafusion_common::{Result, exec_datafusion_err}; +use hashbrown::HashTable; -use super::primitive_filter::{BitmapFilter, BitmapFilterConfig, BranchlessFilter}; +use super::primitive_filter::{ + BitmapFilter, BitmapFilterConfig, BranchlessFilter, DirectProbeFilter, +}; +use super::result::build_in_list_result_with_null_shortcircuit; use super::static_filter::{StaticFilter, handle_dictionary}; +/// Maximum length for inline strings (≤12 bytes can be stored in 16-byte view/encoding). +/// Used by both Utf8View short string optimization and Utf8 two-stage filter. +pub(crate) const INLINE_STRING_LEN: usize = 12; + +#[inline] +fn views_as_i128(views: &ScalarBuffer) -> &[i128] { + views.inner().typed_data() +} + /// Reinterpreting filter for bitmap lookups (u8/u16). struct ReinterpretedBitmap { inner: BitmapFilter, @@ -92,6 +109,28 @@ where } } +/// Hash filter for Utf8View short strings (≤12 bytes). +/// +/// Reinterprets the views buffer directly as i128 slice. +struct Utf8ViewHashFilter { + inner: DirectProbeFilter, +} + +impl StaticFilter for Utf8ViewHashFilter { + fn null_count(&self) -> usize { + self.inner.null_count() + } + + fn contains(&self, v: &dyn Array, negated: bool) -> Result { + handle_dictionary!(self, v, negated); + + let sv = v.as_string_view(); + let values = views_as_i128(sv.views()); + + Ok(self.inner.contains_slice(values, sv.nulls(), negated)) + } +} + /// Reinterprets a same-width primitive array as the target primitive type `T`. /// /// This is a zero-copy operation: the returned array shares the original values @@ -232,13 +271,281 @@ where } } +// NOTE: Optimizations below assume Little Endian layout (DataFusion standard). + +/// Helper to extract the length from a Utf8View u128/i128 view. +#[inline(always)] +fn view_len(view: i128) -> u32 { + view as u32 +} + +/// Checks if all strings in a Utf8View array are short enough to be inline. +/// +/// In Utf8View, strings ≤12 bytes are stored inline in the 16-byte view struct. +/// These can be reinterpreted as i128 for fast equality comparison. +#[inline] +pub(crate) fn utf8view_all_short_strings(array: &dyn Array) -> bool { + let sv = array.as_string_view(); + sv.views().iter().enumerate().all(|(i, &view)| { + !sv.is_valid(i) || view_len(view as i128) as usize <= INLINE_STRING_LEN + }) +} + +/// Reinterprets a Utf8View array as Decimal128 by treating the view bytes as i128. +#[inline] +fn reinterpret_utf8view_as_decimal128(array: &dyn Array) -> ArrayRef { + let sv = array.as_string_view(); + let buffer = ScalarBuffer::::new(sv.views().inner().clone(), 0, sv.len()); + Arc::new(PrimitiveArray::::new( + buffer, + sv.nulls().cloned(), + )) +} + +/// Creates a hash filter for Utf8View arrays with short strings. +pub(crate) fn make_utf8view_hash_filter( + in_array: &ArrayRef, +) -> Result> { + let reinterpreted = reinterpret_utf8view_as_decimal128(in_array.as_ref()); + let inner = DirectProbeFilter::::try_new(&reinterpreted)?; + Ok(Arc::new(Utf8ViewHashFilter { inner })) +} + +/// Creates a branchless filter for Utf8View arrays with short strings. +pub(crate) fn make_utf8view_branchless_filter( + in_array: &ArrayRef, +) -> Result> { + let reinterpreted = reinterpret_utf8view_as_decimal128(in_array.as_ref()); + + macro_rules! try_branchless { + ($($n:literal),*) => { + $(if let Some(Ok(inner)) = BranchlessFilter::::try_new(&reinterpreted) { + return Ok(Arc::new(Utf8ViewBranchless { inner })); + })* + }; + } + try_branchless!(0, 1, 2, 3, 4); + + datafusion_common::exec_err!( + "Utf8View branchless filter only supports 0-4 elements, got {}", + in_array.len() - in_array.null_count() + ) +} + +/// Branchless filter for Utf8View short strings (≤12 bytes). +struct Utf8ViewBranchless { + inner: BranchlessFilter, +} + +impl StaticFilter for Utf8ViewBranchless { + fn null_count(&self) -> usize { + self.inner.null_count() + } + + fn contains(&self, v: &dyn Array, negated: bool) -> Result { + handle_dictionary!(self, v, negated); + + let sv = v.as_string_view(); + let values = views_as_i128(sv.views()); + + Ok(self.inner.contains_slice(values, sv.nulls(), negated)) + } +} + +/// Mask to extract len + prefix from a Utf8View view (zeroes out buffer_index and offset). +/// +/// View layout (16 bytes, Little Endian): +/// - Bytes 0-3 (low): length (u32) +/// - Bytes 4-7: prefix (long strings) or inline data bytes 0-3 (short strings) +/// - Bytes 8-11: buffer_index (long) or inline data bytes 4-7 (short) +/// - Bytes 12-15 (high): offset (long) or inline data bytes 8-11 (short) +/// +/// For long strings (>12 bytes), buffer_index and offset are array-specific, +/// so we mask them out, keeping only len + prefix for comparison. +const VIEW_MASK_LONG: i128 = (1_i128 << 64) - 1; // Keep low 64 bits + +/// Computes the masked view for comparison. +/// +/// - Short strings (≤12 bytes): returns full view (all data is inline) +/// - Long strings (>12 bytes): returns only len + prefix (masks out buffer_index/offset) +#[inline(always)] +fn masked_view(view: i128) -> i128 { + let len = view_len(view) as usize; + + if len <= INLINE_STRING_LEN { + view // Short string: all 16 bytes are meaningful data + } else { + view & VIEW_MASK_LONG // Long string: keep only len + prefix + } +} + +/// Two-stage filter for ByteView arrays (Utf8View, BinaryView) with mixed lengths. +/// +/// Stage 1: Quick rejection using masked views (len + prefix as i128) +/// - Non-matches rejected without any hashing using DirectProbeFilter +/// - Short value matches (≤12 bytes) accepted immediately +/// +/// Stage 2: Full verification for long value matches +/// - Only reached when masked view matches AND value is long (>12 bytes) +/// - Uses HashTable lookup with indices into haystack array +pub(crate) struct ByteViewMaskedFilter { + /// The haystack array containing values to match against. + in_array: ArrayRef, + /// DirectProbeFilter for O(1) masked view quick rejection (faster than HashSet) + masked_view_filter: DirectProbeFilter, + /// HashTable storing indices of long strings for Stage 2 verification + long_value_table: HashTable, + /// Random state for consistent hashing between haystack and needles + state: RandomState, + _phantom: PhantomData, +} + +impl ByteViewMaskedFilter +where + T::Native: PartialEq, +{ + pub(crate) fn try_new(in_array: ArrayRef) -> Result { + let bv = in_array.as_byte_view::(); + let views = views_as_i128(bv.views()); + + let mut masked_views = Vec::with_capacity(in_array.len() - in_array.null_count()); + let state = RandomState::default(); + let mut long_value_table = HashTable::new(); + + let mut process_idx = |idx: usize| { + let view = views[idx]; + masked_views.push(masked_view(view)); + + let len = view_len(view) as usize; + if len > INLINE_STRING_LEN { + // Use the same byte hash used by Stage 2 probing. + // SAFETY: idx is valid from iterator + let val = unsafe { bv.value_unchecked(idx) }; + let bytes: &[u8] = val.as_ref(); + let hash = state.hash_one(bytes); + + // Only insert if not already present (deduplication) + if long_value_table + .find(hash, |&stored_idx| { + let stored: &[u8] = + unsafe { bv.value_unchecked(stored_idx) }.as_ref(); + stored == bytes + }) + .is_none() + { + long_value_table.insert_unique(hash, idx, |&i| { + let stored: &[u8] = unsafe { bv.value_unchecked(i) }.as_ref(); + state.hash_one(stored) + }); + } + } + }; + + match bv.nulls() { + Some(nulls) => { + BitIndexIterator::new(nulls.validity(), nulls.offset(), nulls.len()) + .for_each(&mut process_idx); + } + None => { + (0..in_array.len()).for_each(&mut process_idx); + } + } + + // Build DirectProbeFilter from collected masked views + let masked_view_filter = + DirectProbeFilter::::from_values(masked_views.into_iter()); + + Ok(Self { + in_array, + masked_view_filter, + long_value_table, + state, + _phantom: PhantomData, + }) + } +} + +impl StaticFilter for ByteViewMaskedFilter +where + T::Native: PartialEq, +{ + fn null_count(&self) -> usize { + self.in_array.null_count() + } + + fn contains(&self, v: &dyn Array, negated: bool) -> Result { + handle_dictionary!(self, v, negated); + + let needle_bv = v.as_byte_view::(); + let needle_views = views_as_i128(needle_bv.views()); + let needle_null_count = needle_bv.null_count(); + let haystack_has_nulls = self.in_array.null_count() > 0; + let haystack_bv = self.in_array.as_byte_view::(); + + // Single pass with lazy hashing - only hash long values that pass Stage 1 + // Use null shortcircuit: Stage 2 string comparison is expensive, + // so skipping lookups for null positions is worth the branch overhead + Ok(build_in_list_result_with_null_shortcircuit( + v.len(), + needle_bv.nulls(), + needle_null_count, + haystack_has_nulls, + negated, + #[inline(always)] + |i| { + let needle_view = needle_views[i]; + let masked = masked_view(needle_view); + + // Stage 1: Quick rejection via DirectProbeFilter (O(1) lookup) + if !self.masked_view_filter.contains_single(masked) { + return false; + } + + // Masked view found in set + let needle_len = view_len(needle_view) as usize; + + if needle_len <= INLINE_STRING_LEN { + // Short value: masked view = full view, true match + return true; + } + + // Stage 2: Long value - hash lazily and lookup in hash table + // SAFETY: i is in bounds, closure only called for valid positions + let needle_val = unsafe { needle_bv.value_unchecked(i) }; + let needle_bytes: &[u8] = needle_val.as_ref(); + let hash = self.state.hash_one(needle_bytes); + + self.long_value_table + .find(hash, |&idx| { + let haystack_val: &[u8] = + unsafe { haystack_bv.value_unchecked(idx) }.as_ref(); + haystack_val == needle_bytes + }) + .is_some() + }, + )) + } +} + +/// Creates a two-stage filter for ByteView arrays (Utf8View, BinaryView). +pub(crate) fn make_byte_view_masked_filter( + in_array: ArrayRef, +) -> Result> +where + T::Native: PartialEq, +{ + Ok(Arc::new(ByteViewMaskedFilter::::try_new(in_array)?)) +} + #[cfg(test)] mod tests { use super::*; use std::sync::Arc; - use arrow::array::{ArrayRef, BooleanArray, Int8Array, Int16Array, Int32Array}; - use arrow::datatypes::UInt32Type; + use arrow::array::{ + ArrayRef, BooleanArray, Int8Array, Int16Array, Int32Array, StringViewArray, + }; + use arrow::datatypes::{StringViewType, UInt32Type}; #[test] fn reinterpreted_bitmap_handles_signed_boundaries_and_slices() -> Result<()> { @@ -298,4 +605,61 @@ mod tests { Ok(()) } + + #[test] + fn utf8view_hash_filter_handles_short_slices() -> Result<()> { + let haystack: ArrayRef = Arc::new( + StringViewArray::from(vec![ + Some("outside"), + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + Some("tail"), + ]) + .slice(1, 5), + ); + let filter = make_utf8view_hash_filter(&haystack)?; + let needles = + StringViewArray::from(vec![Some("outside"), Some("b"), Some("z"), Some("e")]) + .slice(1, 3); + + assert_eq!( + filter.contains(&needles, false)?, + BooleanArray::from(vec![Some(true), Some(false), Some(true)]) + ); + + Ok(()) + } + + #[test] + fn byte_view_masked_filter_verifies_long_string_matches() -> Result<()> { + let haystack: ArrayRef = Arc::new( + StringViewArray::from(vec![ + Some("outside"), + Some("abcdefghijklmn1"), + Some("short"), + Some("zzzzzzzzzzzzzz"), + Some("tail"), + ]) + .slice(1, 3), + ); + let filter = make_byte_view_masked_filter::(haystack)?; + let needles = StringViewArray::from(vec![ + Some("outside"), + Some("abcdefghijklmn1"), + Some("abcdefghijklmn2"), + Some("short"), + Some("tail"), + ]) + .slice(1, 3); + + assert_eq!( + filter.contains(&needles, false)?, + BooleanArray::from(vec![Some(true), Some(false), Some(true)]) + ); + + Ok(()) + } } From b62beb47c08ab93311db9464e1352644c4d04829 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Thu, 12 Mar 2026 12:39:56 +0100 Subject: [PATCH 11/11] Implement FixedSizeBinary zero-copy reinterpretation optimization FixedSizeBinary(N) arrays share the same contiguous buffer layout as primitive arrays, so for power-of-2 widths (1, 2, 4, 8, 16) we can zero-copy reinterpret them and use the optimized primitive filters (bitmap, branchless, hash) instead of falling through to the NestedTypeFilter fallback. --- .../physical-expr/src/expressions/in_list.rs | 132 ++++++++++++++++++ .../src/expressions/in_list/strategy.rs | 100 ++++++++++++- .../src/expressions/in_list/transform.rs | 20 ++- 3 files changed, 243 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index be73b0a9d11be..992b52f97b9cf 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -923,6 +923,138 @@ mod tests { ]) } + #[test] + fn test_in_list_fixed_size_binary_canonical_path() -> Result<()> { + let width = 16; + let matching = vec![0x10; width as usize]; + let also_in_list = vec![0x20; width as usize]; + let not_in_list = vec![0x30; width as usize]; + + let schema = Schema::new(vec![Field::new( + "a", + DataType::FixedSizeBinary(width), + true, + )]); + let col_a = col("a", &schema)?; + let list_array = Arc::new(FixedSizeBinaryArray::try_from(vec![ + matching.as_slice(), + also_in_list.as_slice(), + ])?) as ArrayRef; + let expr = Arc::new(InListExpr::try_new_from_array( + Arc::clone(&col_a), + list_array, + false, + &schema, + )?) as Arc; + + let batch_array = Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size( + vec![ + Some(matching.as_slice()), + Some(not_in_list.as_slice()), + None, + ] + .into_iter(), + width, + )?) as ArrayRef; + let batch = RecordBatch::try_new(Arc::new(schema), vec![batch_array])?; + + let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?; + let result = as_boolean_array(&result); + assert_eq!( + result, + &BooleanArray::from(vec![Some(true), Some(false), None]) + ); + + Ok(()) + } + + #[test] + fn test_in_list_fixed_size_binary_offset_path() -> Result<()> { + let width = 16; + let before_slice = vec![0x01; width as usize]; + let in_slice = vec![0x02; width as usize]; + let not_in_slice = vec![0x03; width as usize]; + + let schema = Schema::new(vec![Field::new( + "a", + DataType::FixedSizeBinary(width), + false, + )]); + let col_a = col("a", &schema)?; + let parent = Arc::new(FixedSizeBinaryArray::try_from(vec![ + before_slice.as_slice(), + in_slice.as_slice(), + not_in_slice.as_slice(), + ])?) as ArrayRef; + let sliced_list = parent.slice(1, 1); + let expr = Arc::new(InListExpr::try_new_from_array( + Arc::clone(&col_a), + sliced_list, + false, + &schema, + )?) as Arc; + + let batch_array = Arc::new(FixedSizeBinaryArray::try_from(vec![ + in_slice.as_slice(), + before_slice.as_slice(), + ])?) as ArrayRef; + let batch = RecordBatch::try_new(Arc::new(schema), vec![batch_array])?; + + let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?; + let result = as_boolean_array(&result); + assert_eq!(result, &BooleanArray::from(vec![Some(true), Some(false)])); + + Ok(()) + } + + #[test] + fn test_in_list_fixed_size_binary_offset_path_with_nulls() -> Result<()> { + let width = 16; + let before_slice = vec![0xAA; width as usize]; + let in_slice = vec![0xBB; width as usize]; + let not_in_slice = vec![0xCC; width as usize]; + + let schema = Schema::new(vec![Field::new( + "a", + DataType::FixedSizeBinary(width), + true, + )]); + let col_a = col("a", &schema)?; + let parent = Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size( + vec![ + Some(before_slice.as_slice()), + None, + Some(in_slice.as_slice()), + ] + .into_iter(), + width, + )?) as ArrayRef; + let sliced_list = parent.slice(1, 2); + let expr = Arc::new(InListExpr::try_new_from_array( + Arc::clone(&col_a), + sliced_list, + false, + &schema, + )?) as Arc; + + let batch_array = Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size( + vec![ + Some(in_slice.as_slice()), + Some(not_in_slice.as_slice()), + None, + ] + .into_iter(), + width, + )?) as ArrayRef; + let batch = RecordBatch::try_new(Arc::new(schema), vec![batch_array])?; + + let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?; + let result = as_boolean_array(&result); + assert_eq!(result, &BooleanArray::from(vec![Some(true), None, None])); + + Ok(()) + } + /// Test IN LIST for date types (Date32, Date64). /// /// Test data: 0 (in list), 2 (not in list), [1, 3] (other list values) diff --git a/datafusion/physical-expr/src/expressions/in_list/strategy.rs b/datafusion/physical-expr/src/expressions/in_list/strategy.rs index 0ad8523385df9..97d8387c78312 100644 --- a/datafusion/physical-expr/src/expressions/in_list/strategy.rs +++ b/datafusion/physical-expr/src/expressions/in_list/strategy.rs @@ -15,6 +15,19 @@ // specific language governing permissions and limitations // under the License. +//! Filter selection strategy for InList expressions +//! +//! Selects the optimal lookup strategy based on data type and list size: +//! +//! - 1-byte types (Int8/UInt8): bitmap (32 bytes, O(1) bit test) +//! - 2-byte types (Int16/UInt16): bitmap (8 KB, O(1) bit test) +//! - 4-byte types (Int32/Float32): branchless (up to 32) or hash +//! - 8-byte types (Int64/Float64): branchless (up to 16) or hash +//! - 16-byte types (Decimal128): branchless (up to 4) or hash +//! - Utf8View/BinaryView: short-string or masked-view filters +//! - FixedSizeBinary(1/2/4/8/16): reinterpreted as fixed-width values +//! - Other types: ArrayStaticFilter fallback + use std::sync::Arc; use arrow::array::ArrayRef; @@ -28,7 +41,8 @@ use super::static_filter::{StaticFilter, handle_dictionary}; use super::transform::{ make_bitmap_filter, make_branchless_filter, make_byte_view_masked_filter, make_utf8view_branchless_filter, make_utf8view_hash_filter, - reinterpret_any_primitive_to, utf8view_all_short_strings, + matches_reinterpreted_width, reinterpret_any_primitive_to, + utf8view_all_short_strings, }; /// Maximum list size for branchless lookup on 4-byte primitives (Int32, UInt32, Float32). @@ -109,6 +123,11 @@ pub(super) fn instantiate_static_filter( }; } + // FixedSizeBinary with power-of-2 width: reinterpret as primitive + if let &DataType::FixedSizeBinary(byte_width) = dt { + return instantiate_fixed_size_binary_filter(in_array, byte_width); + } + let strategy = select_strategy(dt, len); match (dt, strategy) { @@ -142,6 +161,44 @@ pub(super) fn instantiate_static_filter( } } +/// Creates the optimal filter for FixedSizeBinary(N) arrays. +/// +/// Widths 1, 2, 4, 8, and 16 share the same contiguous values-buffer layout +/// as the corresponding primitive width, so they can use the optimized +/// bitmap, branchless, or direct-probe filters without copying values. +fn instantiate_fixed_size_binary_filter( + in_array: ArrayRef, + byte_width: i32, +) -> Result> { + let len = in_array.len() - in_array.null_count(); + match byte_width { + 1 => make_bitmap_filter::(&in_array), + 2 => make_bitmap_filter::(&in_array), + 4 => { + if len <= BRANCHLESS_MAX_4B { + make_branchless_filter::(&in_array) + } else { + make_direct_probe_filter_reinterpreted::(&in_array) + } + } + 8 => { + if len <= BRANCHLESS_MAX_8B { + make_branchless_filter::(&in_array) + } else { + make_direct_probe_filter_reinterpreted::(&in_array) + } + } + 16 => { + if len <= BRANCHLESS_MAX_16B { + make_branchless_filter::(&in_array) + } else { + make_direct_probe_filter_reinterpreted::(&in_array) + } + } + _ => Ok(Arc::new(ArrayStaticFilter::try_new(in_array)?)), + } +} + fn dispatch_branchless( arr: &ArrayRef, ) -> Option>> { @@ -195,7 +252,7 @@ where if in_array.data_type() == &D::DATA_TYPE { return Ok(Arc::new(DirectProbeFilter::::try_new(in_array)?)); } - if in_array.data_type().primitive_width() != Some(size_of::()) { + if !matches_reinterpreted_width(in_array.data_type(), size_of::()) { return Err(exec_datafusion_err!( "DirectProbeFilter: expected {}-byte primitive array, got {}", size_of::(), @@ -233,7 +290,7 @@ where negated: bool, ) -> Result { handle_dictionary!(self, v, negated); - if v.data_type().primitive_width() != Some(size_of::()) { + if !matches_reinterpreted_width(v.data_type(), size_of::()) { return Err(exec_datafusion_err!( "DirectProbeFilter: expected {}-byte primitive array, got {}", size_of::(), @@ -251,7 +308,7 @@ where mod tests { use super::*; - use arrow::array::{ArrayRef, BooleanArray, Float64Array}; + use arrow::array::{ArrayRef, BooleanArray, FixedSizeBinaryArray, Float64Array}; #[test] fn direct_probe_strategy_handles_reinterpreted_slices() -> Result<()> { @@ -292,4 +349,39 @@ mod tests { Ok(()) } + + #[test] + fn fixed_size_binary_uses_bitmap_and_direct_probe_paths() -> Result<()> { + let haystack: ArrayRef = Arc::new(FixedSizeBinaryArray::try_from(vec![ + [0x12, 0x34].as_slice(), + [0xab, 0xcd].as_slice(), + ])?); + let filter = instantiate_static_filter(haystack)?; + let needles = FixedSizeBinaryArray::try_from(vec![ + [0xab, 0xcd].as_slice(), + [0xff, 0xff].as_slice(), + ])?; + assert_eq!( + filter.contains(&needles, false)?, + BooleanArray::from(vec![Some(true), Some(false)]) + ); + + let values: Vec<_> = (0_u32..33).map(u32::to_le_bytes).collect(); + let haystack_values: Vec<_> = values.iter().map(|v| v.as_slice()).collect(); + let haystack: ArrayRef = + Arc::new(FixedSizeBinaryArray::try_from(haystack_values)?); + let filter = instantiate_static_filter(haystack)?; + let direct_hit = 7_u32.to_le_bytes(); + let direct_miss = 99_u32.to_le_bytes(); + let needles = FixedSizeBinaryArray::try_from(vec![ + direct_hit.as_slice(), + direct_miss.as_slice(), + ])?; + assert_eq!( + filter.contains(&needles, false)?, + BooleanArray::from(vec![Some(true), Some(false)]) + ); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/in_list/transform.rs b/datafusion/physical-expr/src/expressions/in_list/transform.rs index d20215890283f..a78b807e14739 100644 --- a/datafusion/physical-expr/src/expressions/in_list/transform.rs +++ b/datafusion/physical-expr/src/expressions/in_list/transform.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, PrimitiveArray}; use arrow::buffer::ScalarBuffer; -use arrow::datatypes::{ArrowPrimitiveType, ByteViewType, Decimal128Type}; +use arrow::datatypes::{ArrowPrimitiveType, ByteViewType, DataType, Decimal128Type}; use arrow::util::bit_iterator::BitIndexIterator; use datafusion_common::hash_utils::RandomState; use datafusion_common::{Result, exec_datafusion_err}; @@ -49,6 +49,16 @@ fn views_as_i128(views: &ScalarBuffer) -> &[i128] { views.inner().typed_data() } +#[inline] +pub(crate) fn matches_reinterpreted_width(dt: &DataType, width: usize) -> bool { + dt.primitive_width() == Some(width) + || matches!( + dt, + DataType::FixedSizeBinary(byte_width) + if usize::try_from(*byte_width).ok() == Some(width) + ) +} + /// Reinterpreting filter for bitmap lookups (u8/u16). struct ReinterpretedBitmap { inner: BitmapFilter, @@ -62,7 +72,7 @@ impl StaticFilter for ReinterpretedBitmap { fn contains(&self, v: &dyn Array, negated: bool) -> Result { handle_dictionary!(self, v, negated); - if v.data_type().primitive_width() != Some(size_of::()) { + if !matches_reinterpreted_width(v.data_type(), size_of::()) { return Err(exec_datafusion_err!( "BitmapFilter: expected {}-byte primitive array, got {}", size_of::(), @@ -94,7 +104,7 @@ where fn contains(&self, v: &dyn Array, negated: bool) -> Result { handle_dictionary!(self, v, negated); - if v.data_type().primitive_width() != Some(size_of::()) { + if !matches_reinterpreted_width(v.data_type(), size_of::()) { return Err(exec_datafusion_err!( "BranchlessFilter: expected {}-byte primitive array, got {}", size_of::(), @@ -156,7 +166,7 @@ where if in_array.data_type() == &C::ArrowType::DATA_TYPE { return Ok(Arc::new(BitmapFilter::::try_new(in_array)?)); } - if in_array.data_type().primitive_width() != Some(size_of::()) { + if !matches_reinterpreted_width(in_array.data_type(), size_of::()) { return Err(exec_datafusion_err!( "BitmapFilter: expected {}-byte primitive array for {} bitmap, got {}", size_of::(), @@ -188,7 +198,7 @@ where let arr = if is_native { Arc::clone(in_array) } else { - if in_array.data_type().primitive_width() != Some(width) { + if !matches_reinterpreted_width(in_array.data_type(), width) { return Err(exec_datafusion_err!( "BranchlessFilter: expected {width}-byte primitive array, got {}", in_array.data_type()