Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
520 changes: 520 additions & 0 deletions encodings/experimental/onpair/src/compute/like.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions encodings/experimental/onpair/src/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ mod byte_length;
mod cast;
mod compare;
mod filter;
mod like;
mod slice;
3 changes: 3 additions & 0 deletions encodings/experimental/onpair/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use vortex_array::scalar_fn::fns::binary::Binary;
use vortex_array::scalar_fn::fns::binary::CompareExecuteAdaptor;
use vortex_array::scalar_fn::fns::byte_length::ByteLength;
use vortex_array::scalar_fn::fns::byte_length::ByteLengthExecuteAdaptor;
use vortex_array::scalar_fn::fns::like::Like;
use vortex_array::scalar_fn::fns::like::LikeExecuteAdaptor;
use vortex_session::VortexSession;

use crate::OnPair;
Expand All @@ -24,4 +26,5 @@ pub(super) fn initialize(session: &VortexSession) {
OnPair,
ByteLengthExecuteAdaptor(OnPair),
);
kernels.register_execute_parent_kernel(Like.id(), OnPair, LikeExecuteAdaptor(OnPair));
}
107 changes: 107 additions & 0 deletions vortex-array/src/arrays/dict/compute/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ use crate::EqMode;
use crate::IntoArray;
use crate::array::ArrayView;
use crate::array::VTable;
use crate::arrays::Chunked;
use crate::arrays::ChunkedArray;
use crate::arrays::Constant;
use crate::arrays::ConstantArray;
use crate::arrays::Dict;
use crate::arrays::DictArray;
use crate::arrays::ScalarFn;
use crate::arrays::ScalarFnArray;
use crate::arrays::chunked::ChunkedArrayExt;
use crate::arrays::dict::DictArrayExt;
use crate::arrays::dict::DictArraySlotsExt;
use crate::arrays::filter::FilterReduceAdaptor;
Expand All @@ -37,11 +40,59 @@ pub(crate) const PARENT_RULES: ParentRuleSet<Dict> = ParentRuleSet::new(&[
ParentRuleSet::lift(&CastReduceAdaptor(Dict)),
ParentRuleSet::lift(&MaskReduceAdaptor(Dict)),
ParentRuleSet::lift(&LikeReduceAdaptor(Dict)),
ParentRuleSet::lift(&DictionaryChunkedValuesPullUpRule),
ParentRuleSet::lift(&DictionaryScalarFnValuesPushDownRule),
ParentRuleSet::lift(&DictionaryScalarFnCodesPullUpRule),
ParentRuleSet::lift(&SliceReduceAdaptor(Dict)),
]);

/// Pull a common dictionary values array above chunked dictionary codes.
///
/// Rewrites `Chunked<Dict<codes_i, values>>` into `Dict<Chunked<codes_i>, values>` only when
/// every child dictionary shares the exact same values array allocation.
#[derive(Debug)]
struct DictionaryChunkedValuesPullUpRule;

impl ArrayParentReduceRule<Dict> for DictionaryChunkedValuesPullUpRule {
type Parent = Chunked;

fn reduce_parent(
&self,
array: ArrayView<'_, Dict>,
parent: ArrayView<'_, Chunked>,
_child_idx: usize,
) -> VortexResult<Option<ArrayRef>> {
let values = array.values();
let codes_dtype = array.codes().dtype().clone();
let mut code_chunks = Vec::with_capacity(parent.nchunks());
let mut all_values_referenced = array.has_all_values_referenced();

for chunk in parent.iter_chunks() {
let Some(dict) = chunk.as_opt::<Dict>() else {
return Ok(None);
};
if dict.codes().dtype() != &codes_dtype {
return Ok(None);
}
if !ArrayRef::ptr_eq(dict.values(), values) {
return Ok(None);
}
all_values_referenced |= dict.has_all_values_referenced();
code_chunks.push(dict.codes().clone());
}

let codes = ChunkedArray::try_new(code_chunks, codes_dtype)?.into_array();
let dict = DictArray::try_new(codes, values.clone())?;
let dict = if all_values_referenced {
unsafe { dict.set_all_values_referenced(true) }
} else {
dict
};

Ok(Some(dict.into_array()))
}
}

/// Push down a scalar function to run only over the values of a dictionary array.
#[derive(Debug)]
struct DictionaryScalarFnValuesPushDownRule;
Expand Down Expand Up @@ -214,16 +265,72 @@ mod tests {
use vortex_buffer::buffer;
use vortex_error::VortexResult;

use crate::ArrayRef;
use crate::IntoArray;
use crate::arrays::BoolArray;
use crate::arrays::Chunked;
use crate::arrays::ChunkedArray;
use crate::arrays::Dict;
use crate::arrays::DictArray;
use crate::arrays::PrimitiveArray;
use crate::arrays::chunked::ChunkedArrayExt;
use crate::arrays::dict::DictArrayExt;
use crate::arrays::dict::DictArraySlotsExt;
use crate::arrays::scalar_fn::ScalarFnFactoryExt;
use crate::assert_arrays_eq;
use crate::executor::VortexSessionExecute;
use crate::optimizer::ArrayOptimizer;
use crate::scalar_fn::EmptyOptions;
use crate::scalar_fn::fns::not::Not;

#[test]
fn chunked_dict_with_shared_values_pulls_values_up() -> VortexResult<()> {
let values = buffer![10u32, 20, 30].into_array();
let chunk0 = DictArray::try_new(buffer![0u8, 1].into_array(), values.clone())?.into_array();
let chunk1 =
DictArray::try_new(buffer![2u8, 0, 1].into_array(), values.clone())?.into_array();
let array =
ChunkedArray::try_new(vec![chunk0, chunk1], values.dtype().clone())?.into_array();

let optimized = array.optimize()?;
let dict = optimized.as_::<Dict>();
let codes = dict.codes().as_::<Chunked>();

assert!(ArrayRef::ptr_eq(dict.values(), &values));
assert_eq!(codes.nchunks(), 2);
let mut ctx = crate::LEGACY_SESSION.create_execution_ctx();
assert_arrays_eq!(
optimized,
PrimitiveArray::from_iter([10u32, 20, 30, 10, 20]),
&mut ctx
);

Ok(())
}

#[test]
fn chunked_dict_with_distinct_values_stays_chunked() -> VortexResult<()> {
let values0 = buffer![10u32, 20, 30].into_array();
let values1 = buffer![10u32, 20, 30].into_array();
let chunk0 =
DictArray::try_new(buffer![0u8, 1].into_array(), values0.clone())?.into_array();
let chunk1 = DictArray::try_new(buffer![2u8, 0, 1].into_array(), values1)?.into_array();
let array =
ChunkedArray::try_new(vec![chunk0, chunk1], values0.dtype().clone())?.into_array();

let optimized = array.optimize()?;

assert!(optimized.is::<Chunked>());
let mut ctx = crate::LEGACY_SESSION.create_execution_ctx();
assert_arrays_eq!(
optimized,
PrimitiveArray::from_iter([10u32, 20, 30, 10, 20]),
&mut ctx
);

Ok(())
}

#[test]
fn scalar_fn_values_pushdown_preserves_all_values_referenced() -> VortexResult<()> {
let dict = unsafe {
Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/arrays/dict/vtable/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use vortex_session::VortexSession;

use crate::ArrayVTable;
use crate::arrays::Chunked;
use crate::arrays::Dict;
use crate::arrays::dict::TakeExecuteAdaptor;
use crate::optimizer::kernels::ArrayKernelsExt;
Expand All @@ -16,6 +17,7 @@ use crate::scalar_fn::fns::fill_null::FillNullExecuteAdaptor;
pub(crate) fn initialize(session: &VortexSession) {
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Binary.id(), Dict, CompareExecuteAdaptor(Dict));
kernels.register_execute_parent_kernel(Dict.id(), Chunked, TakeExecuteAdaptor(Chunked));
kernels.register_execute_parent_kernel(Dict.id(), Dict, TakeExecuteAdaptor(Dict));
kernels.register_execute_parent_kernel(FillNull.id(), Dict, FillNullExecuteAdaptor(Dict));
}
9 changes: 6 additions & 3 deletions vortex-array/src/arrays/filter/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ use crate::kernel::ExecuteParentKernel;
use crate::matcher::Matcher;
use crate::optimizer::kernels::ArrayKernelsExt;
use crate::optimizer::rules::ArrayParentReduceRule;
use crate::scalar_fn::ScalarFnVTable;
use crate::scalar_fn::fns::like::Like;
use crate::scalar_fn::fns::like::LikeFilterExecuteAdaptor;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(Dict.id(), Filter, TakeExecuteAdaptor(Filter));
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Dict.id(), Filter, TakeExecuteAdaptor(Filter));
kernels.register_execute_parent_kernel(Like.id(), Filter, LikeFilterExecuteAdaptor);
}

pub trait FilterReduce: VTable {
Expand Down
8 changes: 8 additions & 0 deletions vortex-array/src/arrays/shared/vtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ impl VTable for Shared {
.get_or_compute(|source| source.clone().execute::<Canonical>(ctx))
.map(ExecutionResult::done)
}

fn reduce_parent(
array: ArrayView<'_, Self>,
parent: &ArrayRef,
child_idx: usize,
) -> VortexResult<Option<ArrayRef>> {
array.current_array_ref().reduce_parent(parent, child_idx)
}
}
impl OperationsVTable<Shared> for Shared {
fn scalar_at(
Expand Down
31 changes: 31 additions & 0 deletions vortex-array/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::ArrayRef;
use crate::Canonical;
use crate::IntoArray;
use crate::array::ArrayId;
use crate::arrays::Shared;
use crate::arrays::shared::SharedArrayExt;
use crate::builders::ArrayBuilder;
use crate::builders::builder_with_capacity_in;
use crate::dtype::DType;
Expand Down Expand Up @@ -568,6 +570,35 @@ fn execute_parent_for_child(
slot_idx: usize,
kernels: &ParentExecutionKernels,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
if let Some(result) = execute_parent_for_exact_child(parent, child, slot_idx, kernels, ctx)? {
return Ok(Some(result));
}

// Shared is a transparent cache wrapper. Try kernels against the wrapped source/current array
// before forcing Shared to canonicalize and populate its cache.
let mut current = child.clone();
while let Some(source) = current
.as_opt::<Shared>()
.map(|shared| shared.current_array_ref().clone())
{
if let Some(result) =
execute_parent_for_exact_child(parent, &source, slot_idx, kernels, ctx)?
{
return Ok(Some(result));
}
current = source;
}

Ok(None)
}

fn execute_parent_for_exact_child(
parent: &ArrayRef,
child: &ArrayRef,
slot_idx: usize,
kernels: &ParentExecutionKernels,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
let key = execute_parent_key(parent.encoding_id(), child.encoding_id());
if let Some(plugins) = kernels.get(&key) {
Expand Down
45 changes: 45 additions & 0 deletions vortex-array/src/scalar_fn/fns/like/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ use vortex_error::VortexResult;

use crate::ArrayRef;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::array::ArrayView;
use crate::array::VTable;
use crate::arrays::Filter;
use crate::arrays::ScalarFn;
use crate::arrays::ScalarFnArray;
use crate::arrays::scalar_fn::ExactScalarFn;
use crate::arrays::scalar_fn::ScalarFnArrayExt;
use crate::arrays::scalar_fn::ScalarFnArrayView;
Expand Down Expand Up @@ -105,3 +108,45 @@ where
<V as LikeKernel>::like(array, pattern, options, ctx)
}
}

/// Adaptor that executes a filtered input before evaluating LIKE.
///
/// This preserves sparse row demand for `LIKE(Filter(child), constant)`: the filter executes into a
/// filtered child first, then the regular child-specific LIKE execute-parent kernel can run over
/// only the selected rows.
#[derive(Default, Debug)]
pub struct LikeFilterExecuteAdaptor;

impl ExecuteParentKernel<Filter> for LikeFilterExecuteAdaptor {
type Parent = ExactScalarFn<LikeExpr>;

fn execute_parent(
&self,
array: ArrayView<'_, Filter>,
parent: ScalarFnArrayView<'_, LikeExpr>,
child_idx: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
if child_idx != 0 {
return Ok(None);
}
let scalar_fn_array = parent
.as_opt::<ScalarFn>()
.vortex_expect("ExactScalarFn matcher confirmed ScalarFnArray");
let filtered = array.array().clone().execute::<ArrayRef>(ctx)?;
let children = scalar_fn_array
.iter_children()
.enumerate()
.map(|(idx, child)| {
if idx == child_idx {
filtered.clone()
} else {
child.clone()
}
})
.collect();
Ok(Some(
ScalarFnArray::try_new(scalar_fn_array.scalar_fn().clone(), children)?.into_array(),
))
}
}
13 changes: 12 additions & 1 deletion vortex-file/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use vortex_array::dtype::FieldPath;
use vortex_btrblocks::BtrBlocksCompressorBuilder;
use vortex_btrblocks::SchemeExt;
use vortex_btrblocks::schemes::integer::IntDictScheme;
#[cfg(feature = "unstable_encodings")]
use vortex_btrblocks::schemes::string::OnPairScheme;
use vortex_bytebool::ByteBool;
use vortex_datetime_parts::DateTimeParts;
use vortex_decimal_byte_parts::DecimalByteParts;
Expand Down Expand Up @@ -160,8 +162,17 @@ impl Default for WriteStrategyBuilder {
/// Create a new empty builder. It can be further configured,
/// and then finally built yielding the [`LayoutStrategy`].
fn default() -> Self {
#[cfg_attr(not(feature = "unstable_encodings"), allow(unused_mut))]
let mut compressor = BtrBlocksCompressorBuilder::default();
#[cfg(feature = "unstable_encodings")]
{
// OnPair currently optimizes for compressed size, but its string predicate kernels are
// not yet competitive with FSST for the scan-heavy default file format.
compressor = compressor.exclude_schemes([OnPairScheme.id()]);
}

Self {
compressor: CompressorConfig::BtrBlocks(BtrBlocksCompressorBuilder::default()),
compressor: CompressorConfig::BtrBlocks(compressor),
row_block_size: 8192,
field_writers: HashMap::new(),
allow_encodings: Some(ALLOWED_ENCODINGS.clone()),
Expand Down
Loading