Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -17800,6 +17800,84 @@ pub fn vortex_array::scalar_fn::fns::select::Select::stat_falsification(&self, &

pub fn vortex_array::scalar_fn::fns::select::Select::validity(&self, &Self::Options, &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub mod vortex_array::scalar_fn::fns::stat

pub struct vortex_array::scalar_fn::fns::stat::StatFn

impl core::clone::Clone for vortex_array::scalar_fn::fns::stat::StatFn

pub fn vortex_array::scalar_fn::fns::stat::StatFn::clone(&self) -> vortex_array::scalar_fn::fns::stat::StatFn

impl vortex_array::scalar_fn::ScalarFnVTable for vortex_array::scalar_fn::fns::stat::StatFn

pub type vortex_array::scalar_fn::fns::stat::StatFn::Options = vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatFn::arity(&self, &Self::Options) -> vortex_array::scalar_fn::Arity

pub fn vortex_array::scalar_fn::fns::stat::StatFn::child_name(&self, &Self::Options, usize) -> vortex_array::scalar_fn::ChildName

pub fn vortex_array::scalar_fn::fns::stat::StatFn::coerce_args(&self, &Self::Options, &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<alloc::vec::Vec<vortex_array::dtype::DType>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::execute(&self, &Self::Options, &dyn vortex_array::scalar_fn::ExecutionArgs, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::fmt_sql(&self, &Self::Options, &vortex_array::expr::Expression, &mut core::fmt::Formatter<'_>) -> core::fmt::Result

pub fn vortex_array::scalar_fn::fns::stat::StatFn::id(&self) -> vortex_array::scalar_fn::ScalarFnId

pub fn vortex_array::scalar_fn::fns::stat::StatFn::is_fallible(&self, &Self::Options) -> bool

pub fn vortex_array::scalar_fn::fns::stat::StatFn::is_null_sensitive(&self, &Self::Options) -> bool

pub fn vortex_array::scalar_fn::fns::stat::StatFn::reduce(&self, &Self::Options, &dyn vortex_array::scalar_fn::ReduceNode, &dyn vortex_array::scalar_fn::ReduceCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::scalar_fn::ReduceNodeRef>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::return_dtype(&self, &Self::Options, &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::simplify(&self, &Self::Options, &vortex_array::expr::Expression, &dyn vortex_array::scalar_fn::SimplifyCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::simplify_untyped(&self, &Self::Options, &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::stat_expression(&self, &Self::Options, &vortex_array::expr::Expression, vortex_array::expr::stats::Stat, &dyn vortex_array::expr::pruning::StatsCatalog) -> core::option::Option<vortex_array::expr::Expression>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::stat_falsification(&self, &Self::Options, &vortex_array::expr::Expression, &dyn vortex_array::expr::pruning::StatsCatalog) -> core::option::Option<vortex_array::expr::Expression>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::validity(&self, &Self::Options, &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub struct vortex_array::scalar_fn::fns::stat::StatOptions

impl vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::aggregate_fn(&self) -> &vortex_array::aggregate_fn::AggregateFnRef

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::new(vortex_array::aggregate_fn::AggregateFnRef) -> Self

impl core::clone::Clone for vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::clone(&self) -> vortex_array::scalar_fn::fns::stat::StatOptions

impl core::cmp::Eq for vortex_array::scalar_fn::fns::stat::StatOptions

impl core::cmp::PartialEq for vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::eq(&self, &vortex_array::scalar_fn::fns::stat::StatOptions) -> bool

impl core::fmt::Debug for vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::fmt::Display for vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::hash::Hash for vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::hash<__H: core::hash::Hasher>(&self, &mut __H)

impl core::marker::StructuralPartialEq for vortex_array::scalar_fn::fns::stat::StatOptions

pub mod vortex_array::scalar_fn::fns::zip

pub struct vortex_array::scalar_fn::fns::zip::Zip
Expand Down Expand Up @@ -19066,6 +19144,44 @@ pub fn vortex_array::scalar_fn::fns::select::Select::stat_falsification(&self, &

pub fn vortex_array::scalar_fn::fns::select::Select::validity(&self, &Self::Options, &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

impl vortex_array::scalar_fn::ScalarFnVTable for vortex_array::scalar_fn::fns::stat::StatFn

pub type vortex_array::scalar_fn::fns::stat::StatFn::Options = vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatFn::arity(&self, &Self::Options) -> vortex_array::scalar_fn::Arity

pub fn vortex_array::scalar_fn::fns::stat::StatFn::child_name(&self, &Self::Options, usize) -> vortex_array::scalar_fn::ChildName

pub fn vortex_array::scalar_fn::fns::stat::StatFn::coerce_args(&self, &Self::Options, &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<alloc::vec::Vec<vortex_array::dtype::DType>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::execute(&self, &Self::Options, &dyn vortex_array::scalar_fn::ExecutionArgs, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::fmt_sql(&self, &Self::Options, &vortex_array::expr::Expression, &mut core::fmt::Formatter<'_>) -> core::fmt::Result

pub fn vortex_array::scalar_fn::fns::stat::StatFn::id(&self) -> vortex_array::scalar_fn::ScalarFnId

pub fn vortex_array::scalar_fn::fns::stat::StatFn::is_fallible(&self, &Self::Options) -> bool

pub fn vortex_array::scalar_fn::fns::stat::StatFn::is_null_sensitive(&self, &Self::Options) -> bool

pub fn vortex_array::scalar_fn::fns::stat::StatFn::reduce(&self, &Self::Options, &dyn vortex_array::scalar_fn::ReduceNode, &dyn vortex_array::scalar_fn::ReduceCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::scalar_fn::ReduceNodeRef>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::return_dtype(&self, &Self::Options, &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::simplify(&self, &Self::Options, &vortex_array::expr::Expression, &dyn vortex_array::scalar_fn::SimplifyCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::simplify_untyped(&self, &Self::Options, &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::stat_expression(&self, &Self::Options, &vortex_array::expr::Expression, vortex_array::expr::stats::Stat, &dyn vortex_array::expr::pruning::StatsCatalog) -> core::option::Option<vortex_array::expr::Expression>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::stat_falsification(&self, &Self::Options, &vortex_array::expr::Expression, &dyn vortex_array::expr::pruning::StatsCatalog) -> core::option::Option<vortex_array::expr::Expression>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::validity(&self, &Self::Options, &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

impl vortex_array::scalar_fn::ScalarFnVTable for vortex_array::scalar_fn::fns::zip::Zip

pub type vortex_array::scalar_fn::fns::zip::Zip::Options = vortex_array::scalar_fn::EmptyOptions
Expand Down Expand Up @@ -19450,6 +19566,86 @@ pub type vortex_array::session::ArrayRegistry = vortex_session::registry::Regist

pub mod vortex_array::stats

pub mod vortex_array::stats::expr

pub struct vortex_array::stats::expr::StatFn

impl core::clone::Clone for vortex_array::scalar_fn::fns::stat::StatFn

pub fn vortex_array::scalar_fn::fns::stat::StatFn::clone(&self) -> vortex_array::scalar_fn::fns::stat::StatFn

impl vortex_array::scalar_fn::ScalarFnVTable for vortex_array::scalar_fn::fns::stat::StatFn

pub type vortex_array::scalar_fn::fns::stat::StatFn::Options = vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatFn::arity(&self, &Self::Options) -> vortex_array::scalar_fn::Arity

pub fn vortex_array::scalar_fn::fns::stat::StatFn::child_name(&self, &Self::Options, usize) -> vortex_array::scalar_fn::ChildName

pub fn vortex_array::scalar_fn::fns::stat::StatFn::coerce_args(&self, &Self::Options, &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<alloc::vec::Vec<vortex_array::dtype::DType>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::execute(&self, &Self::Options, &dyn vortex_array::scalar_fn::ExecutionArgs, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::fmt_sql(&self, &Self::Options, &vortex_array::expr::Expression, &mut core::fmt::Formatter<'_>) -> core::fmt::Result

pub fn vortex_array::scalar_fn::fns::stat::StatFn::id(&self) -> vortex_array::scalar_fn::ScalarFnId

pub fn vortex_array::scalar_fn::fns::stat::StatFn::is_fallible(&self, &Self::Options) -> bool

pub fn vortex_array::scalar_fn::fns::stat::StatFn::is_null_sensitive(&self, &Self::Options) -> bool

pub fn vortex_array::scalar_fn::fns::stat::StatFn::reduce(&self, &Self::Options, &dyn vortex_array::scalar_fn::ReduceNode, &dyn vortex_array::scalar_fn::ReduceCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::scalar_fn::ReduceNodeRef>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::return_dtype(&self, &Self::Options, &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::simplify(&self, &Self::Options, &vortex_array::expr::Expression, &dyn vortex_array::scalar_fn::SimplifyCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::simplify_untyped(&self, &Self::Options, &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::stat_expression(&self, &Self::Options, &vortex_array::expr::Expression, vortex_array::expr::stats::Stat, &dyn vortex_array::expr::pruning::StatsCatalog) -> core::option::Option<vortex_array::expr::Expression>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::stat_falsification(&self, &Self::Options, &vortex_array::expr::Expression, &dyn vortex_array::expr::pruning::StatsCatalog) -> core::option::Option<vortex_array::expr::Expression>

pub fn vortex_array::scalar_fn::fns::stat::StatFn::validity(&self, &Self::Options, &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub struct vortex_array::stats::expr::StatOptions

impl vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::aggregate_fn(&self) -> &vortex_array::aggregate_fn::AggregateFnRef

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::new(vortex_array::aggregate_fn::AggregateFnRef) -> Self

impl core::clone::Clone for vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::clone(&self) -> vortex_array::scalar_fn::fns::stat::StatOptions

impl core::cmp::Eq for vortex_array::scalar_fn::fns::stat::StatOptions

impl core::cmp::PartialEq for vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::eq(&self, &vortex_array::scalar_fn::fns::stat::StatOptions) -> bool

impl core::fmt::Debug for vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::fmt::Display for vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::hash::Hash for vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::scalar_fn::fns::stat::StatOptions::hash<__H: core::hash::Hasher>(&self, &mut __H)

impl core::marker::StructuralPartialEq for vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::stats::expr::stat(vortex_array::expr::Expression, vortex_array::aggregate_fn::AggregateFnRef) -> vortex_array::expr::Expression

pub mod vortex_array::stats::flatbuffers

pub struct vortex_array::stats::ArrayStats
Expand Down Expand Up @@ -19682,6 +19878,8 @@ pub const vortex_array::stats::PRUNING_STATS: &[vortex_array::expr::stats::Stat]

pub fn vortex_array::stats::as_stat_bitset_bytes(&[vortex_array::expr::stats::Stat]) -> alloc::vec::Vec<u8>

pub fn vortex_array::stats::stat(vortex_array::expr::Expression, vortex_array::aggregate_fn::AggregateFnRef) -> vortex_array::expr::Expression

pub fn vortex_array::stats::stats_from_bitset_bytes(&[u8]) -> alloc::vec::Vec<vortex_array::expr::stats::Stat>

pub type vortex_array::stats::StatsArray = [(vortex_array::expr::stats::Stat, vortex_array::expr::stats::Precision<vortex_array::scalar::ScalarValue>); 4]
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/scalar_fn/fns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ pub mod operators;
pub mod pack;
pub mod root;
pub mod select;
pub mod stat;
pub mod zip;
141 changes: 141 additions & 0 deletions vortex-array/src/scalar_fn/fns/stat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Scalar function implementation for aggregate-backed stat expressions.

use std::fmt::Display;
use std::fmt::Formatter;

use vortex_error::VortexResult;
use vortex_error::vortex_bail;

use crate::ArrayRef;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::aggregate_fn::AggregateFnRef;
use crate::arrays::Chunked;
use crate::arrays::ChunkedArray;
use crate::arrays::ConstantArray;
use crate::arrays::chunked::ChunkedArrayExt;
use crate::dtype::DType;
use crate::expr::Expression;
use crate::expr::stats::StatsProvider;
use crate::scalar::Scalar;
use crate::scalar_fn::Arity;
use crate::scalar_fn::ChildName;
use crate::scalar_fn::ExecutionArgs;
use crate::scalar_fn::ScalarFnId;
use crate::scalar_fn::ScalarFnVTable;
use crate::stats::legacy::legacy_stat_for_aggregate;

/// Options for the `stat` scalar function.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct StatOptions {
aggregate_fn: AggregateFnRef,
}

impl StatOptions {
/// Creates options for the provided aggregate statistic.
pub fn new(aggregate_fn: AggregateFnRef) -> Self {
Self { aggregate_fn }
}

/// Returns the aggregate function backing this statistic lookup.
pub fn aggregate_fn(&self) -> &AggregateFnRef {
&self.aggregate_fn
}
}

impl Display for StatOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.aggregate_fn, f)
}
}

/// Scalar function that evaluates stored aggregate statistics.
#[derive(Clone)]
pub struct StatFn;

impl ScalarFnVTable for StatFn {
type Options = StatOptions;

fn id(&self) -> ScalarFnId {
ScalarFnId::new("vortex.stat")
}

fn arity(&self, _options: &Self::Options) -> Arity {
Arity::Exact(1)
}

fn child_name(&self, _options: &Self::Options, child_idx: usize) -> ChildName {
match child_idx {
0 => ChildName::from("input"),
_ => unreachable!("Invalid child index {} for Stat expression", child_idx),
}
}

fn fmt_sql(
&self,
options: &Self::Options,
expr: &Expression,
f: &mut Formatter<'_>,
) -> std::fmt::Result {
write!(f, "stat(")?;
expr.child(0).fmt_sql(f)?;
write!(f, ", {})", options.aggregate_fn())
}

fn return_dtype(&self, options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult<DType> {
stat_dtype(options.aggregate_fn(), &arg_dtypes[0])
}

fn execute(
&self,
options: &Self::Options,
args: &dyn ExecutionArgs,
_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let input = args.get(0)?;
let dtype = stat_dtype(options.aggregate_fn(), input.dtype())?;

if let Some(chunked) = input.as_opt::<Chunked>() {
let chunks = chunked
.iter_chunks()
.map(|chunk| stat_array(chunk, options.aggregate_fn(), dtype.clone(), chunk.len()))
.collect::<VortexResult<Vec<_>>>()?;
return Ok(ChunkedArray::try_new(chunks, dtype)?.into_array());
}

stat_array(&input, options.aggregate_fn(), dtype, args.row_count())
}
}

fn stat_dtype(aggregate_fn: &AggregateFnRef, input_dtype: &DType) -> VortexResult<DType> {
let Some(dtype) = aggregate_fn.return_dtype(input_dtype) else {
vortex_bail!(
"Aggregate function {} does not support input dtype {}",
aggregate_fn,
input_dtype
);
};
Ok(dtype.as_nullable())
}

fn stat_array(
array: &ArrayRef,
aggregate_fn: &AggregateFnRef,
dtype: DType,
len: usize,
) -> VortexResult<ArrayRef> {
let value = legacy_stat_for_aggregate(aggregate_fn)
.and_then(|stat| {
array
.statistics()
.with_typed_stats_set(|stats| stats.get(stat))
})
.and_then(|stat| stat.as_exact())
.and_then(Scalar::into_value);

let scalar = Scalar::try_new(dtype, value)?;
Ok(ConstantArray::new(scalar, len).into_array())
}
2 changes: 2 additions & 0 deletions vortex-array/src/scalar_fn/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::scalar_fn::fns::not::Not;
use crate::scalar_fn::fns::pack::Pack;
use crate::scalar_fn::fns::root::Root;
use crate::scalar_fn::fns::select::Select;
use crate::scalar_fn::fns::stat::StatFn;

/// Registry of scalar function vtables.
pub type ScalarFnRegistry = Registry<ScalarFnPluginRef>;
Expand Down Expand Up @@ -70,6 +71,7 @@ impl Default for ScalarFnSession {
this.register(Pack);
this.register(Root);
this.register(Select);
this.register(StatFn);

this
}
Expand Down
Loading
Loading