diff --git a/Cargo.lock b/Cargo.lock index 66457601e06..e24bdd12075 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10088,6 +10088,7 @@ dependencies = [ "rstest", "vortex-array", "vortex-error", + "vortex-layout", "vortex-session", "wkb", ] diff --git a/vortex-array/src/aggregate_fn/plugin.rs b/vortex-array/src/aggregate_fn/plugin.rs index b7ff8b893ac..e14053f6a24 100644 --- a/vortex-array/src/aggregate_fn/plugin.rs +++ b/vortex-array/src/aggregate_fn/plugin.rs @@ -10,6 +10,7 @@ use crate::aggregate_fn::AggregateFn; use crate::aggregate_fn::AggregateFnId; use crate::aggregate_fn::AggregateFnRef; use crate::aggregate_fn::AggregateFnVTable; +use crate::dtype::DType; /// Reference-counted pointer to an aggregate function plugin. pub type AggregateFnPluginRef = Arc; @@ -28,6 +29,9 @@ pub trait AggregateFnPlugin: 'static + Send + Sync { /// Deserialize an aggregate function from serialized metadata. fn deserialize(&self, metadata: &[u8], session: &VortexSession) -> VortexResult; + + /// The default per-chunk zone statistic to store for a column of `input_dtype`, or `None` if this aggregate isn't one. + fn zone_stat_default(&self, input_dtype: &DType) -> Option; } impl std::fmt::Debug for dyn AggregateFnPlugin { @@ -51,4 +55,8 @@ impl AggregateFnPlugin for V { let options = AggregateFnVTable::deserialize(self, metadata, session)?; Ok(AggregateFn::new(self.clone(), options).erased()) } + + fn zone_stat_default(&self, input_dtype: &DType) -> Option { + AggregateFnVTable::zone_stat_default(self, input_dtype) + } } diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index b309fd02a10..0e4b9e563bb 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -9,6 +9,7 @@ use vortex_session::SessionVar; use crate::aggregate_fn::AggregateFnId; use crate::aggregate_fn::AggregateFnPluginRef; +use crate::aggregate_fn::AggregateFnRef; use crate::aggregate_fn::AggregateFnVTable; use crate::aggregate_fn::fns::all_nan::AllNan; use crate::aggregate_fn::fns::all_non_distinct::AllNonDistinct; @@ -43,6 +44,7 @@ use crate::arrays::chunked::compute::aggregate::ChunkedArrayAggregate; use crate::arrays::dict::compute::is_constant::DictIsConstantKernel; use crate::arrays::dict::compute::is_sorted::DictIsSortedKernel; use crate::arrays::dict::compute::min_max::DictMinMaxKernel; +use crate::dtype::DType; /// Session state for aggregate functions and encoding-specific aggregate kernels. /// @@ -133,6 +135,17 @@ impl AggregateFnSession { self.registry.insert(id, pluginref); } + /// The default per-chunk zone statistics for a column of `input_dtype`, collected from every + /// registered aggregate's `zone_stat_default`. + pub fn zone_stat_defaults(&self, input_dtype: &DType) -> Vec { + self.registry.read(|registry| { + registry + .values() + .filter_map(|plugin| plugin.zone_stat_default(input_dtype)) + .collect() + }) + } + /// Returns the aggregate kernel registered for `array_id` and `agg_fn_id`, if any. /// /// Lookup first checks for a kernel registered for the exact aggregate function, then falls diff --git a/vortex-array/src/aggregate_fn/vtable.rs b/vortex-array/src/aggregate_fn/vtable.rs index 49b28dd26d7..372a1287fce 100644 --- a/vortex-array/src/aggregate_fn/vtable.rs +++ b/vortex-array/src/aggregate_fn/vtable.rs @@ -93,6 +93,13 @@ pub trait AggregateFnVTable: 'static + Sized + Clone + Send + Sync { /// Returns `None` if the aggregate function cannot be applied to the input dtype. fn return_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option; + /// If this aggregate should be computed as a default zone statistic for `input_dtype`, return + /// the bound aggregate to store. Default: not a zone-map default. + fn zone_stat_default(&self, input_dtype: &DType) -> Option { + let _ = input_dtype; + None + } + /// DType of the intermediate partial accumulator state. /// /// Use a struct dtype when multiple fields are needed diff --git a/vortex-geo/Cargo.toml b/vortex-geo/Cargo.toml index e2f7e4dc10f..49fa9b81869 100644 --- a/vortex-geo/Cargo.toml +++ b/vortex-geo/Cargo.toml @@ -28,6 +28,7 @@ wkb = { workspace = true } [dev-dependencies] rstest = { workspace = true } +vortex-layout = { workspace = true } [lints] workspace = true diff --git a/vortex-geo/src/aggregate_fn/bounds.rs b/vortex-geo/src/aggregate_fn/bounds.rs new file mode 100644 index 00000000000..1d31f9e3ff1 --- /dev/null +++ b/vortex-geo/src/aggregate_fn/bounds.rs @@ -0,0 +1,340 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! An aggregate computing the minimum bounding rectangle (2D) of a native +//! geometry column as `Struct`. Stored as a zone statistic, it lets spatial +//! filters prune chunks whose bounding box cannot intersect the query region. + +use vortex_array::ArrayRef; +use vortex_array::Columnar; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::aggregate_fn::AggregateFnId; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::AggregateFnVTable; +use vortex_array::aggregate_fn::AggregateFnVTableExt; +use vortex_array::aggregate_fn::EmptyOptions; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::struct_::StructArrayExt; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; +use vortex_array::dtype::StructFields; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_session::VortexSession; + +use crate::extension::coordinates; +use crate::extension::is_native_geometry; + +/// Aggregate computing the minimum bounding rectangle of a native geometry column, as +/// `Struct` of `f64`. +#[derive(Clone, Debug)] +pub struct GeometryBounds; + +/// An axis-aligned bounding box: four `f64`s, all this aggregate needs to min/max coordinates. +#[derive(Clone, Copy)] +struct Bbox { + xmin: f64, + ymin: f64, + xmax: f64, + ymax: f64, +} + +impl Bbox { + /// The smallest box containing both `self` and `other`. + fn union(self, other: Bbox) -> Bbox { + Bbox { + xmin: self.xmin.min(other.xmin), + ymin: self.ymin.min(other.ymin), + xmax: self.xmax.max(other.xmax), + ymax: self.ymax.max(other.ymax), + } + } +} + +/// Partial MBR accumulator: the union of every bounding box seen so far, or `None` when empty. +pub struct BoundsPartial { + bbox: Option, +} + +impl BoundsPartial { + fn merge(&mut self, other: Bbox) { + self.bbox = Some(match self.bbox { + Some(cur) => cur.union(other), + None => other, + }); + } +} + +/// `Struct` of `f64`, nullable so an empty group yields a null MBR. The +/// coordinate fields are themselves nullable so that extracting one from the nullable struct (as the +/// pruning proof does) keeps a consistent nullable dtype. +fn bounds_dtype() -> DType { + let coord = DType::Primitive(PType::F64, Nullability::Nullable); + DType::Struct( + StructFields::from_iter([ + ("xmin", coord.clone()), + ("ymin", coord.clone()), + ("xmax", coord.clone()), + ("ymax", coord), + ]), + Nullability::Nullable, + ) +} + +/// The bounding box of the coordinate slices, or `None` for an empty chunk. +fn bounds_of(xs: &[f64], ys: &[f64]) -> Option { + if xs.is_empty() { + return None; + } + let min_max = |vals: &[f64]| { + vals.iter() + .fold((f64::INFINITY, f64::NEG_INFINITY), |(lo, hi), &v| { + (lo.min(v), hi.max(v)) + }) + }; + let (xmin, xmax) = min_max(xs); + let (ymin, ymax) = min_max(ys); + Some(Bbox { + xmin, + ymin, + xmax, + ymax, + }) +} + +impl AggregateFnVTable for GeometryBounds { + type Options = EmptyOptions; + type Partial = BoundsPartial; + + fn id(&self) -> AggregateFnId { + AggregateFnId::new("vortex.geo.bounds") + } + + // Serializable so the zoned writer can persist this as a per-chunk stat. No options to encode. + fn serialize(&self, _options: &Self::Options) -> VortexResult>> { + Ok(Some(vec![])) + } + + fn deserialize(&self, _metadata: &[u8], _session: &VortexSession) -> VortexResult { + Ok(EmptyOptions) + } + + fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { + is_native_geometry(input_dtype).then(bounds_dtype) + } + + fn zone_stat_default(&self, input_dtype: &DType) -> Option { + // Geometry columns get a per-chunk bounding box for pruning. + is_native_geometry(input_dtype).then(|| self.bind(EmptyOptions)) + } + + fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option { + self.return_dtype(options, input_dtype) + } + + fn empty_partial( + &self, + _options: &Self::Options, + _input_dtype: &DType, + ) -> VortexResult { + Ok(BoundsPartial { bbox: None }) + } + + fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> { + if other.is_null() { + return Ok(()); + } + let fields = other.as_struct(); + let read = |name: &str| -> VortexResult { + f64::try_from( + &fields + .field(name) + .ok_or_else(|| vortex_err!("bounds missing {name}"))?, + ) + }; + partial.merge(Bbox { + xmin: read("xmin")?, + ymin: read("ymin")?, + xmax: read("xmax")?, + ymax: read("ymax")?, + }); + Ok(()) + } + + fn to_scalar(&self, partial: &Self::Partial) -> VortexResult { + Ok(match partial.bbox { + Some(b) => Scalar::struct_( + bounds_dtype(), + vec![ + Scalar::primitive(b.xmin, Nullability::Nullable), + Scalar::primitive(b.ymin, Nullability::Nullable), + Scalar::primitive(b.xmax, Nullability::Nullable), + Scalar::primitive(b.ymax, Nullability::Nullable), + ], + ), + None => Scalar::null(bounds_dtype()), + }) + } + + fn reset(&self, partial: &mut Self::Partial) { + partial.bbox = None; + } + + fn is_saturated(&self, _partial: &Self::Partial) -> bool { + // A bounding box can always grow, so it is never saturated. + false + } + + fn accumulate( + &self, + partial: &mut Self::Partial, + batch: &Columnar, + ctx: &mut ExecutionCtx, + ) -> VortexResult<()> { + let array = match batch { + Columnar::Canonical(canonical) => canonical.clone().into_array(), + Columnar::Constant(constant) => constant.clone().into_array(), + }; + let coords = coordinates(&array, ctx)?; + let xs = coords + .unmasked_field_by_name("x")? + .clone() + .execute::(ctx)?; + let ys = coords + .unmasked_field_by_name("y")? + .clone() + .execute::(ctx)?; + if let Some(bbox) = bounds_of(xs.as_slice::(), ys.as_slice::()) { + partial.merge(bbox); + } + Ok(()) + } + + fn finalize(&self, partials: ArrayRef) -> VortexResult { + // The stored partial is already the MBR struct, so finalizing is the identity. + Ok(partials) + } + + fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult { + self.to_scalar(partial) + } +} + +#[cfg(test)] +mod tests { + use vortex_array::VortexSessionExecute; + use vortex_array::aggregate_fn::Accumulator; + use vortex_array::aggregate_fn::AggregateFnVTable; + use vortex_array::aggregate_fn::DynAccumulator; + use vortex_array::aggregate_fn::EmptyOptions; + use vortex_array::scalar::Scalar; + use vortex_error::VortexResult; + use vortex_error::vortex_err; + + use super::GeometryBounds; + use crate::test_harness::point_column; + use crate::test_harness::polygon_column; + + /// The aggregate must be serializable so the zoned writer can persist its zone-stat descriptor. + #[test] + fn serializes_for_zone_storage() -> VortexResult<()> { + let session = vortex_array::array_session(); + let metadata = GeometryBounds + .serialize(&EmptyOptions)? + .expect("GeometryBounds must be serializable to be stored as a zone statistic"); + GeometryBounds.deserialize(&metadata, &session)?; + Ok(()) + } + + /// The MBR result's corners as `(xmin, ymin, xmax, ymax)`. + fn mbr(result: &Scalar) -> VortexResult<(f64, f64, f64, f64)> { + let fields = result.as_struct(); + let read = |name: &str| -> VortexResult { + f64::try_from( + &fields + .field(name) + .ok_or_else(|| vortex_err!("missing {name}"))?, + ) + }; + Ok((read("xmin")?, read("ymin")?, read("xmax")?, read("ymax")?)) + } + + /// The MBR of a Point column is the min/max of its coordinates, accumulated across batches. + #[test] + fn point_bounds_across_batches() -> VortexResult<()> { + let session = vortex_array::array_session(); + let mut ctx = session.create_execution_ctx(); + + let dtype = point_column(vec![0.0], vec![0.0])?.dtype().clone(); + let mut acc = Accumulator::try_new(GeometryBounds, EmptyOptions, dtype)?; + + acc.accumulate(&point_column(vec![1.0, 3.0], vec![2.0, 4.0])?, &mut ctx)?; + acc.accumulate(&point_column(vec![-1.0], vec![5.0])?, &mut ctx)?; + + assert_eq!(mbr(&acc.finish()?)?, (-1.0, 2.0, 3.0, 5.0)); + Ok(()) + } + + /// The MBR of a Polygon column is the min/max over every ring vertex of every polygon — + /// exercising the `List>` unwrap, not just the bare Point struct. + #[test] + fn polygon_bounds_union_all_vertices() -> VortexResult<()> { + let session = vortex_array::array_session(); + let mut ctx = session.create_execution_ctx(); + + // Two rectangles: (0,0)-(2,3) and (5,5)-(7,8). The chunk MBR is their union: (0,0)-(7,8). + let polygons = polygon_column(vec![ + vec![vec![(0.0, 0.0), (2.0, 0.0), (2.0, 3.0), (0.0, 3.0)]], + vec![vec![(5.0, 5.0), (7.0, 5.0), (7.0, 8.0), (5.0, 8.0)]], + ])?; + let dtype = polygons.dtype().clone(); + let mut acc = Accumulator::try_new(GeometryBounds, EmptyOptions, dtype)?; + acc.accumulate(&polygons, &mut ctx)?; + + assert_eq!(mbr(&acc.finish()?)?, (0.0, 0.0, 7.0, 8.0)); + Ok(()) + } + + /// An empty group yields a null MBR. + #[test] + fn empty_group_is_null() -> VortexResult<()> { + let dtype = point_column(vec![0.0], vec![0.0])?.dtype().clone(); + let mut acc = Accumulator::try_new(GeometryBounds, EmptyOptions, dtype)?; + assert!(acc.finish()?.is_null()); + Ok(()) + } + + /// After `initialize`, the registry yields a default zone statistic for geometry columns (so the + /// zoned writer stores it) but none for ordinary numeric columns. + #[test] + fn registered_as_geometry_zone_default() -> VortexResult<()> { + use vortex_array::aggregate_fn::session::AggregateFnSessionExt; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::dtype::PType; + + let session = vortex_array::array_session(); + crate::initialize(&session); + + let point_dtype = point_column(vec![0.0], vec![0.0])?.dtype().clone(); + assert!( + !session + .aggregate_fns() + .zone_stat_defaults(&point_dtype) + .is_empty(), + "a geometry zone-stat default should be discovered for Point columns" + ); + let i32_dtype = DType::Primitive(PType::I32, Nullability::NonNullable); + assert!( + session + .aggregate_fns() + .zone_stat_defaults(&i32_dtype) + .is_empty(), + "no geometry zone-stat default should apply to numeric columns" + ); + Ok(()) + } +} diff --git a/vortex-geo/src/aggregate_fn/mod.rs b/vortex-geo/src/aggregate_fn/mod.rs new file mode 100644 index 00000000000..22866cbfc06 --- /dev/null +++ b/vortex-geo/src/aggregate_fn/mod.rs @@ -0,0 +1,8 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Aggregate functions over geometry columns, for use as zone statistics. + +mod bounds; + +pub use bounds::*; diff --git a/vortex-geo/src/extension/mod.rs b/vortex-geo/src/extension/mod.rs index 684c83bade0..44cfb9e1c42 100644 --- a/vortex-geo/src/extension/mod.rs +++ b/vortex-geo/src/extension/mod.rs @@ -19,13 +19,44 @@ use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::StructArray; use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::dtype::DType; use vortex_array::scalar::Scalar; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_err; pub use wkb::*; +/// Whether `dtype` is a native geometry extension. +pub(crate) fn is_native_geometry(dtype: &DType) -> bool { + dtype + .as_extension_opt() + .is_some_and(|ext| ext.is::() || ext.is::()) +} + +/// The flat coordinate `Struct` of a native geometry column. +pub(crate) fn coordinates(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + let Some(ext) = array.dtype().as_extension_opt() else { + vortex_bail!( + "geo: operand is not a geometry extension type, was {}", + array.dtype() + ); + }; + let storage = array + .clone() + .execute::(ctx)? + .storage_array() + .clone(); + if ext.is::() { + point_coordinates(&storage, ctx) + } else if ext.is::() { + polygon_coordinates(&storage, ctx) + } else { + vortex_bail!("geo: unsupported geometry extension {}", array.dtype()) + } +} + /// Decode a native geometry column to `geo_types`. A non-geometry operand is an error. pub(crate) fn geometries( array: &ArrayRef, diff --git a/vortex-geo/src/extension/point.rs b/vortex-geo/src/extension/point.rs index 19e33c212f5..981168f53ed 100644 --- a/vortex-geo/src/extension/point.rs +++ b/vortex-geo/src/extension/point.rs @@ -22,6 +22,7 @@ use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::StructArray; use vortex_array::arrays::extension::ExtensionArrayExt; use vortex_array::arrow::ArrowExport; use vortex_array::arrow::ArrowExportVTable; @@ -96,6 +97,14 @@ fn point_type(geo_metadata: &GeoMetadata, dimension: Dimension) -> PointType { PointType::new(dimension.into(), geoarrow_metadata(geo_metadata)) } +/// The coordinate `Struct` of `Point` storage. +pub(crate) fn point_coordinates( + storage: &ArrayRef, + ctx: &mut ExecutionCtx, +) -> VortexResult { + storage.clone().execute::(ctx) +} + /// Decode `Point` storage to `geo_types` points, for the geo scalar functions. pub(crate) fn point_geometries( storage: &ArrayRef, diff --git a/vortex-geo/src/extension/polygon.rs b/vortex-geo/src/extension/polygon.rs index fc06ce59bd3..a7f0192db5c 100644 --- a/vortex-geo/src/extension/polygon.rs +++ b/vortex-geo/src/extension/polygon.rs @@ -20,10 +20,13 @@ use geoarrow::datatypes::CoordType; use geoarrow::datatypes::PolygonType; use prost::Message; use vortex_array::ArrayRef; +use vortex_array::Canonical; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::StructArray; use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::arrays::listview::ListViewArrayExt; use vortex_array::arrow::ArrowExport; use vortex_array::arrow::ArrowExportVTable; use vortex_array::arrow::ArrowImport; @@ -111,6 +114,27 @@ fn polygon_type(geo_metadata: &GeoMetadata, dimension: Dimension) -> PolygonType PolygonType::new(dimension.into(), geoarrow_metadata(geo_metadata)) } +/// The coordinate `Struct` of `Polygon` storage, flattening both `List` levels of +/// `List>` to every vertex of every ring. +pub(crate) fn polygon_coordinates( + storage: &ArrayRef, + ctx: &mut ExecutionCtx, +) -> VortexResult { + // Peel the outer ring list, then each ring's coordinate list, leaving the coordinate struct. + let rings = storage + .clone() + .execute::(ctx)? + .into_listview() + .elements() + .clone(); + let coords = rings + .execute::(ctx)? + .into_listview() + .elements() + .clone(); + coords.execute::(ctx) +} + /// Decode `Polygon` storage (`List>`) to `geo_types` polygons, for the geo scalar /// functions. CRS does not affect planar geometry ops, so default metadata is used. pub(crate) fn polygon_geometries( diff --git a/vortex-geo/src/lib.rs b/vortex-geo/src/lib.rs index 951d93b7b4f..61e4a9ff714 100644 --- a/vortex-geo/src/lib.rs +++ b/vortex-geo/src/lib.rs @@ -3,17 +3,23 @@ use std::sync::Arc; +use vortex_array::aggregate_fn::session::AggregateFnSessionExt; use vortex_array::arrow::ArrowSessionExt; use vortex_array::dtype::session::DTypeSessionExt; use vortex_array::scalar_fn::session::ScalarFnSessionExt; +use vortex_array::stats::session::StatsSessionExt; use vortex_session::VortexSession; +use crate::aggregate_fn::GeometryBounds; use crate::extension::Point; use crate::extension::Polygon; use crate::extension::WellKnownBinary; +use crate::prune::GeoDistanceBoundsPrune; use crate::scalar_fn::distance::GeoDistance; +pub mod aggregate_fn; pub mod extension; +pub mod prune; pub mod scalar_fn; #[cfg(test)] mod test_harness; @@ -35,4 +41,10 @@ pub fn initialize(session: &VortexSession) { // Register the geometry scalar functions. session.scalar_fns().register(GeoDistance); + + // The bounding-box aggregate; self-declares as a per-chunk zone stat for geometry columns. + session.aggregate_fns().register(GeometryBounds); + + // Register the spatial pruning rule that uses that bounding box. + session.stats().register_rewrite(GeoDistanceBoundsPrune); } diff --git a/vortex-geo/src/prune.rs b/vortex-geo/src/prune.rs new file mode 100644 index 00000000000..8a2604e19b7 --- /dev/null +++ b/vortex-geo/src/prune.rs @@ -0,0 +1,251 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Stats-rewrite pruning for spatial filters, backed by the per-chunk [`GeometryBounds`] MBR. +//! +//! [`GeoDistanceBoundsPrune`] falsifies `ST_Distance(geom, const) <= r` (or `< r`): any row within +//! `r` of the constant must lie inside the constant's bounding box expanded by `r`, so if a chunk's +//! geometry MBR is disjoint from that expanded box, no row in the chunk can match and the chunk is +//! skipped. +//! +//! # Limitations +//! +//! Only the "near" forms `<= r` / `< r` are handled — the predicates a radius/within search uses. +//! They prune via the MBR's *lower* distance bound (the nearest point of the box to `const`). Every +//! other comparison falls through to `None`, leaving the chunk to be scanned — correct, just not +//! pruned: +//! +//! - `> r` / `>= r` are *soundly* prunable via the symmetric upper bound (the farthest corner of the +//! MBR being `<= r` proves every geometry is within `r`), but are intentionally omitted: "far from" +//! filters are rare and rarely selective, so the prune would almost never fire. +//! - `== r` would need both bounds at once and is not a realistic query. +//! - `!= r` is unprunable: a bounding box cannot prove every row sits at exactly distance `r`. + +use geo::BoundingRect; +use vortex_array::VortexSessionExecute; +use vortex_array::aggregate_fn::AggregateFnVTableExt; +use vortex_array::aggregate_fn::EmptyOptions; +use vortex_array::expr::Expression; +use vortex_array::expr::get_item; +use vortex_array::expr::gt; +use vortex_array::expr::is_root; +use vortex_array::expr::lit; +use vortex_array::expr::lt; +use vortex_array::expr::or; +use vortex_array::scalar_fn::ScalarFnId; +use vortex_array::scalar_fn::ScalarFnVTable; +use vortex_array::scalar_fn::fns::binary::Binary; +use vortex_array::scalar_fn::fns::literal::Literal; +use vortex_array::scalar_fn::fns::operators::Operator; +use vortex_array::stats::rewrite::StatsRewriteCtx; +use vortex_array::stats::rewrite::StatsRewriteRule; +use vortex_array::stats::stat; +use vortex_error::VortexResult; + +use crate::aggregate_fn::GeometryBounds; +use crate::extension::single_geometry; +use crate::scalar_fn::distance::GeoDistance; + +/// Prunes chunks for `GeoDistance(geom, const) <= r` / `< r` using the chunk's [`GeometryBounds`] +/// MBR. Registered against the comparison's scalar-function id, since the comparison — not +/// `GeoDistance` — is the predicate root. +#[derive(Debug)] +pub struct GeoDistanceBoundsPrune; + +impl StatsRewriteRule for GeoDistanceBoundsPrune { + fn scalar_fn_id(&self) -> ScalarFnId { + Binary.id() + } + + fn falsify( + &self, + expr: &Expression, + ctx: &StatsRewriteCtx<'_>, + ) -> VortexResult> { + // Only the "near" forms `<= r` / `< r` are pruned; every other comparison is left to the + // scan (see the module-level Limitations for why). + match expr.as_::() { + Operator::Lte | Operator::Lt => {} + _ => return Ok(None), + } + let distance = expr.child(0); + let threshold = expr.child(1); + + // The left operand must be `GeoDistance(geom, const)`. + if distance.as_opt::().is_none() { + return Ok(None); + } + + // Identify the geometry column (the scope root) and the constant geometry operand; distance + // is symmetric, so the constant may be on either side. + let (lhs, rhs) = (distance.child(0), distance.child(1)); + let (geom, constant) = if is_root(lhs) { + (lhs, rhs) + } else if is_root(rhs) { + (rhs, lhs) + } else { + return Ok(None); + }; + + let (Some(constant), Some(radius)) = + (constant.as_opt::(), threshold.as_opt::()) + else { + return Ok(None); + }; + let Ok(radius) = f64::try_from(radius) else { + return Ok(None); + }; + + // Bounding box of the constant geometry, expanded by the radius. + let mut exec = ctx.session().create_execution_ctx(); + let Some(rect) = single_geometry(constant, &mut exec)?.bounding_rect() else { + return Ok(None); + }; + let (xmin, xmax) = (rect.min().x - radius, rect.max().x + radius); + let (ymin, ymax) = (rect.min().y - radius, rect.max().y + radius); + + // Chunk MBR disjoint from the expanded box (on any axis), if no row can match then prune. + let mbr = stat(geom.clone(), GeometryBounds.bind(EmptyOptions)); + let proof = or( + or( + lt(get_item("xmax", mbr.clone()), lit(xmin)), + gt(get_item("xmin", mbr.clone()), lit(xmax)), + ), + or( + lt(get_item("ymax", mbr.clone()), lit(ymin)), + gt(get_item("ymin", mbr), lit(ymax)), + ), + ); + Ok(Some(proof)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::aggregate_fn::AggregateFnVTableExt; + use vortex_array::aggregate_fn::EmptyOptions as AggregateEmptyOptions; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::StructArray; + use vortex_array::expr::Expression; + use vortex_array::expr::lit; + use vortex_array::expr::lt_eq; + use vortex_array::expr::root; + use vortex_array::scalar_fn::EmptyOptions; + use vortex_array::scalar_fn::ScalarFnVTableExt; + use vortex_array::scalar_fn::fns::binary::Binary; + use vortex_array::scalar_fn::fns::operators::Operator; + use vortex_array::stats::rewrite::StatsRewriteCtx; + use vortex_array::stats::rewrite::StatsRewriteRule; + use vortex_array::validity::Validity; + use vortex_error::VortexResult; + use vortex_layout::layouts::zoned::zone_map::ZoneMap; + + use super::GeoDistanceBoundsPrune; + use crate::aggregate_fn::GeometryBounds; + use crate::scalar_fn::distance::GeoDistance; + use crate::test_harness::point_column; + + /// Run the rule against `GeoDistance(root, origin) 0.5` (operands swapped when + /// `geom_first` is false). Returns the falsity proof, if any. + fn falsify_distance(operator: Operator, geom_first: bool) -> VortexResult> { + let session = vortex_array::array_session(); + crate::initialize(&session); + let mut ctx = session.create_execution_ctx(); + + let scope = point_column(vec![0.0], vec![0.0])?.dtype().clone(); + let origin = point_column(vec![0.0], vec![0.0])?.execute_scalar(0, &mut ctx)?; + let operands = if geom_first { + [root(), lit(origin)] + } else { + [lit(origin), root()] + }; + let distance = GeoDistance.new_expr(EmptyOptions, operands); + let predicate = Binary.new_expr(operator, [distance, lit(0.5f64)]); + + GeoDistanceBoundsPrune.falsify(&predicate, &StatsRewriteCtx::new(&session, &scope)) + } + + /// Only the upper-bounded "near" forms (`<=`/`<`) are pruned; the rest are left to the scan. + #[rstest] + #[case(Operator::Lte, true)] + #[case(Operator::Lt, true)] + #[case(Operator::Gt, false)] + #[case(Operator::Gte, false)] + #[case(Operator::Eq, false)] + #[case(Operator::NotEq, false)] + fn prunes_only_near_distance( + #[case] operator: Operator, + #[case] prunes: bool, + ) -> VortexResult<()> { + assert_eq!(falsify_distance(operator, true)?.is_some(), prunes); + Ok(()) + } + + /// Distance is symmetric: `GeoDistance(const, geom) <= r` falsifies just like the geom-first form. + #[test] + fn falsifies_with_constant_as_left_operand() -> VortexResult<()> { + assert!(falsify_distance(Operator::Lte, false)?.is_some()); + Ok(()) + } + + /// A comparison that does not wrap `GeoDistance` is left untouched. + #[test] + fn ignores_non_distance_comparison() -> VortexResult<()> { + let session = vortex_array::array_session(); + crate::initialize(&session); + let scope = point_column(vec![0.0], vec![0.0])?.dtype().clone(); + + let predicate = lt_eq(lit(1.0f64), lit(2.0f64)); + let ctx = StatsRewriteCtx::new(&session, &scope); + assert!(GeoDistanceBoundsPrune.falsify(&predicate, &ctx)?.is_none()); + Ok(()) + } + + /// `falsify` to `ZoneMap::prune` over a hand-built zone map: the far chunk is skipped, the near + /// one kept. + #[test] + fn prunes_far_chunk_keeps_near() -> VortexResult<()> { + let session = vortex_array::array_session(); + crate::initialize(&session); + let mut ctx = session.create_execution_ctx(); + + let point_dtype = point_column(vec![0.0], vec![0.0])?.dtype().clone(); + let bounds_fn = GeometryBounds.bind(AggregateEmptyOptions); + + // Two chunks: chunk 0 near the origin (MBR 0,0..1,1), chunk 1 far away (MBR 100,100..101,101). + let coord = + |a: f64, b: f64| PrimitiveArray::from_option_iter([Some(a), Some(b)]).into_array(); + let mbrs = StructArray::try_new( + ["xmin", "ymin", "xmax", "ymax"].into(), + vec![ + coord(0.0, 100.0), + coord(0.0, 100.0), + coord(1.0, 101.0), + coord(1.0, 101.0), + ], + 2, + Validity::AllValid, + )? + .into_array(); + let zone_array = StructArray::from_fields(&[(bounds_fn.to_string().as_str(), mbrs)])?; + let zone_map = + ZoneMap::try_new(point_dtype.clone(), zone_array, Arc::new([bounds_fn]), 1, 2)?; + + let origin = point_column(vec![0.0], vec![0.0])?.execute_scalar(0, &mut ctx)?; + let distance = GeoDistance.new_expr(EmptyOptions, [root(), lit(origin)]); + let predicate = lt_eq(distance, lit(0.5f64)); + let proof = predicate + .falsify(&point_dtype, &session)? + .expect("distance filter should be falsifiable"); + + // `true` means the zone is pruned: chunk 0 (near origin) is kept, chunk 1 (far) is skipped. + let mask = zone_map.prune(&proof, &session)?; + assert_eq!(mask.iter().collect::>(), vec![false, true]); + Ok(()) + } +} diff --git a/vortex-geo/src/test_harness.rs b/vortex-geo/src/test_harness.rs index 2e9e7f43c27..da5c3927256 100644 --- a/vortex-geo/src/test_harness.rs +++ b/vortex-geo/src/test_harness.rs @@ -6,31 +6,86 @@ use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::ListArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::StructArray; +use vortex_array::dtype::Nullability; use vortex_array::dtype::extension::ExtDType; use vortex_array::scalar::Scalar; +use vortex_array::validity::Validity; use vortex_error::VortexResult; +use vortex_error::vortex_err; use crate::extension::GeoMetadata; use crate::extension::Point; +use crate::extension::Polygon; use crate::extension::coordinate::Coordinate; +use crate::extension::coordinate::Dimension; use crate::extension::coordinate::coordinate_from_struct; +use crate::extension::polygon_storage_dtype; -/// A `Point` column (CRS `EPSG:4326`) over the given x/y coordinates. -pub(crate) fn point_column(xs: Vec, ys: Vec) -> VortexResult { - let storage = StructArray::from_fields(&[ +/// The WGS 84 (`EPSG:4326`) metadata tagged onto test geometry columns. +fn wgs84() -> GeoMetadata { + GeoMetadata { + crs: Some("EPSG:4326".to_string()), + } +} + +/// A coordinate `Struct` over the parallel x/y buffers. +fn xy_struct(xs: Vec, ys: Vec) -> VortexResult { + Ok(StructArray::from_fields(&[ ("x", PrimitiveArray::from_iter(xs).into_array()), ("y", PrimitiveArray::from_iter(ys).into_array()), ])? - .into_array(); - let metadata = GeoMetadata { - crs: Some("EPSG:4326".to_string()), - }; - let dtype = ExtDType::::try_new(metadata, storage.dtype().clone())?; + .into_array()) +} + +/// A `Point` column (CRS `EPSG:4326`) over the given x/y coordinates. +pub(crate) fn point_column(xs: Vec, ys: Vec) -> VortexResult { + let storage = xy_struct(xs, ys)?; + let dtype = ExtDType::::try_new(wgs84(), storage.dtype().clone())?; Ok(ExtensionArray::new(dtype.erased(), storage).into_array()) } +/// A `Polygon` column (CRS `EPSG:4326`). Each polygon is a list of rings; each ring a list of +/// `(x, y)` vertices. Stored as `List>>`. +pub(crate) fn polygon_column(polygons: Vec>>) -> VortexResult { + let offset = |n: usize| i32::try_from(n).map_err(|_| vortex_err!("polygon offset overflow")); + + let (mut xs, mut ys) = (Vec::new(), Vec::new()); + let mut ring_offsets = vec![0i32]; + let mut polygon_offsets = vec![0i32]; + for rings in &polygons { + for ring in rings { + for &(x, y) in ring { + xs.push(x); + ys.push(y); + } + ring_offsets.push(offset(xs.len())?); + } + polygon_offsets.push(offset(ring_offsets.len() - 1)?); + } + + let rings = ListArray::try_new( + xy_struct(xs, ys)?, + PrimitiveArray::from_iter(ring_offsets).into_array(), + Validity::NonNullable, + )? + .into_array(); + let storage = ListArray::try_new( + rings, + PrimitiveArray::from_iter(polygon_offsets).into_array(), + Validity::NonNullable, + )? + .into_array(); + + let dtype = ExtDType::::try_new( + wgs84(), + polygon_storage_dtype(Dimension::Xy, Nullability::NonNullable), + )?; + Ok(ExtensionArray::try_new(dtype.erased(), storage)?.into_array()) +} + /// Decode a [`Coordinate`] from an extension-typed point scalar (unwrapped to its coordinate /// storage) or a bare coordinate `Struct` scalar — used to read back a single point in assertions. pub(crate) fn coordinate_from_scalar(scalar: &Scalar) -> VortexResult { diff --git a/vortex-layout/src/layouts/zoned/writer.rs b/vortex-layout/src/layouts/zoned/writer.rs index 7ebca0104e9..317961f0826 100644 --- a/vortex-layout/src/layouts/zoned/writer.rs +++ b/vortex-layout/src/layouts/zoned/writer.rs @@ -25,6 +25,7 @@ use vortex_array::aggregate_fn::fns::min::Min; use vortex_array::aggregate_fn::fns::nan_count::NanCount; use vortex_array::aggregate_fn::fns::null_count::NullCount; use vortex_array::aggregate_fn::fns::sum::Sum; +use vortex_array::aggregate_fn::session::AggregateFnSessionExt; use vortex_array::dtype::DType; use vortex_error::VortexResult; use vortex_error::vortex_ensure; @@ -106,7 +107,7 @@ impl LayoutStrategy for ZonedStrategy { .options .aggregate_fns .clone() - .unwrap_or_else(|| default_zoned_aggregate_fns(stream.dtype())); + .unwrap_or_else(|| default_zoned_aggregate_fns(stream.dtype(), session)); let session = session.clone(); let stats_accumulator = Arc::new(Mutex::new(AggregateStatsAccumulator::new( @@ -178,7 +179,7 @@ impl LayoutStrategy for ZonedStrategy { } } -fn default_zoned_aggregate_fns(dtype: &DType) -> Arc<[AggregateFnRef]> { +fn default_zoned_aggregate_fns(dtype: &DType, session: &VortexSession) -> Arc<[AggregateFnRef]> { let (max, min) = match dtype { DType::Utf8(_) | DType::Binary(_) => ( BoundedMax.bind(BoundedMaxOptions { @@ -204,6 +205,10 @@ fn default_zoned_aggregate_fns(dtype: &DType) -> Arc<[AggregateFnRef]> { aggregate_fns.push(NanCount.bind(EmptyOptions)); aggregate_fns.push(NullCount.bind(EmptyOptions)); + // The builtin stats above are named directly. Stats from extension crates this one can't depend + // on (e.g. a geometry bounding box) are discovered from the registry at runtime instead. + aggregate_fns.extend(session.aggregate_fns().zone_stat_defaults(dtype)); + aggregate_fns.into() } @@ -223,7 +228,10 @@ mod tests { #[test] fn default_aggregates_bound_variable_length_min_max() { - let aggregate_fns = default_zoned_aggregate_fns(&DType::Utf8(Nullability::NonNullable)); + let aggregate_fns = default_zoned_aggregate_fns( + &DType::Utf8(Nullability::NonNullable), + &vortex_array::array_session(), + ); assert_eq!( aggregate_fns[0].as_::().max_bytes, @@ -237,7 +245,8 @@ mod tests { #[test] fn default_aggregates_keep_fixed_width_min_max_exact() { - let aggregate_fns = default_zoned_aggregate_fns(&PType::I32.into()); + let aggregate_fns = + default_zoned_aggregate_fns(&PType::I32.into(), &vortex_array::array_session()); assert!(aggregate_fns[0].is::()); assert!(aggregate_fns[1].is::()); @@ -249,7 +258,7 @@ mod tests { let dtype = DType::Extension( Timestamp::new(TimeUnit::Microseconds, Nullability::Nullable).erased(), ); - let aggregate_fns = default_zoned_aggregate_fns(&dtype); + let aggregate_fns = default_zoned_aggregate_fns(&dtype, &vortex_array::array_session()); assert!( aggregate_fns