diff --git a/pyiceberg/utils/geospatial.py b/pyiceberg/utils/geospatial.py
new file mode 100644
index 0000000000..da337be20d
--- /dev/null
+++ b/pyiceberg/utils/geospatial.py
@@ -0,0 +1,529 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import math
+import struct
+from dataclasses import dataclass
+
+_WKB_POINT = 1
+_WKB_LINESTRING = 2
+_WKB_POLYGON = 3
+_WKB_MULTIPOINT = 4
+_WKB_MULTILINESTRING = 5
+_WKB_MULTIPOLYGON = 6
+_WKB_GEOMETRYCOLLECTION = 7
+
+_EWKB_Z_FLAG = 0x80000000
+_EWKB_M_FLAG = 0x40000000
+_EWKB_SRID_FLAG = 0x20000000
+
+# The 32-byte bound wire format is shared by XYM and XYZM. XYM reserves a NaN
+# value in the z slot as a sentinel, so a real NaN z cannot be serialized when
+# m is present without losing whether z existed.
+
+
+@dataclass(frozen=True)
+class GeospatialBound:
+ x: float
+ y: float
+ z: float | None = None
+ m: float | None = None
+
+ @property
+ def has_z(self) -> bool:
+ return self.z is not None
+
+ @property
+ def has_m(self) -> bool:
+ return self.m is not None
+
+
+@dataclass(frozen=True)
+class GeometryEnvelope:
+ x_min: float
+ y_min: float
+ z_min: float | None
+ m_min: float | None
+ x_max: float
+ y_max: float
+ z_max: float | None
+ m_max: float | None
+
+ def to_min_bound(self) -> GeospatialBound:
+ return GeospatialBound(x=self.x_min, y=self.y_min, z=self.z_min, m=self.m_min)
+
+ def to_max_bound(self) -> GeospatialBound:
+ return GeospatialBound(x=self.x_max, y=self.y_max, z=self.z_max, m=self.m_max)
+
+
+def serialize_geospatial_bound(bound: GeospatialBound) -> bytes:
+ if bound.z is not None and bound.m is not None and math.isnan(bound.z):
+ raise ValueError("Cannot serialize geospatial bound with NaN z and m; NaN z is reserved as the XYM sentinel")
+
+ if bound.z is None and bound.m is None:
+ return struct.pack("
GeospatialBound:
+ if len(raw) == 16:
+ x, y = struct.unpack(" GeometryEnvelope | None:
+ reader = _WKBReader(wkb)
+ accumulator = _EnvelopeAccumulator(is_geography=is_geography)
+ _parse_geometry(reader, accumulator)
+ if reader.remaining() != 0:
+ raise ValueError(f"Trailing bytes found after parsing WKB: {reader.remaining()}")
+ return accumulator.finish()
+
+
+def merge_envelopes(left: GeometryEnvelope, right: GeometryEnvelope, is_geography: bool) -> GeometryEnvelope:
+ if is_geography:
+ x_min, x_max = _merge_longitude_intervals(left.x_min, left.x_max, right.x_min, right.x_max)
+ else:
+ x_min, x_max = min(left.x_min, right.x_min), max(left.x_max, right.x_max)
+
+ return GeometryEnvelope(
+ x_min=x_min,
+ y_min=min(left.y_min, right.y_min),
+ z_min=_merge_optional_min(left.z_min, right.z_min),
+ m_min=_merge_optional_min(left.m_min, right.m_min),
+ x_max=x_max,
+ y_max=max(left.y_max, right.y_max),
+ z_max=_merge_optional_max(left.z_max, right.z_max),
+ m_max=_merge_optional_max(left.m_max, right.m_max),
+ )
+
+
+class GeospatialStatsAggregator:
+ def __init__(self, is_geography: bool) -> None:
+ self.is_geography = is_geography
+ self._envelope: GeometryEnvelope | None = None
+
+ def add(self, wkb: bytes) -> None:
+ envelope = extract_envelope_from_wkb(wkb, self.is_geography)
+ if envelope is None:
+ return
+
+ if self._envelope is None:
+ self._envelope = envelope
+ else:
+ self._envelope = merge_envelopes(self._envelope, envelope, self.is_geography)
+
+ def min_bound(self) -> GeospatialBound | None:
+ if self._envelope is None:
+ return None
+ return self._envelope.to_min_bound()
+
+ def max_bound(self) -> GeospatialBound | None:
+ if self._envelope is None:
+ return None
+ return self._envelope.to_max_bound()
+
+ def serialized_min(self) -> bytes | None:
+ bound = self.min_bound()
+ if bound is None:
+ return None
+ return serialize_geospatial_bound(bound)
+
+ def serialized_max(self) -> bytes | None:
+ bound = self.max_bound()
+ if bound is None:
+ return None
+ return serialize_geospatial_bound(bound)
+
+
+def _merge_optional_min(left: float | None, right: float | None) -> float | None:
+ if left is None:
+ return right
+ if right is None:
+ return left
+ return min(left, right)
+
+
+def _merge_optional_max(left: float | None, right: float | None) -> float | None:
+ if left is None:
+ return right
+ if right is None:
+ return left
+ return max(left, right)
+
+
+@dataclass
+class _EnvelopeAccumulator:
+ is_geography: bool
+ x_min: float | None = None
+ y_min: float | None = None
+ z_min: float | None = None
+ m_min: float | None = None
+ x_max: float | None = None
+ y_max: float | None = None
+ z_max: float | None = None
+ m_max: float | None = None
+ # Geography longitude occupancy is the union of per-EDGE minor-arc spans (in
+ # circle coordinates [0, 360)). Collecting bare vertices and taking their
+ # minimal enclosing arc is unsound: it can exclude an edge that connects two
+ # vertices the "long way" around, dropping rows that lie on that edge.
+ _segments: list[tuple[float, float]] | None = None
+ _prev_circle_lon: float | None = None
+
+ def __post_init__(self) -> None:
+ if self.is_geography:
+ self._segments = []
+
+ def start_sequence(self) -> None:
+ # Edges only connect consecutive vertices WITHIN the same linestring/ring.
+ # Resetting before each sequence prevents a spurious edge across sub-geometries.
+ self._prev_circle_lon = None
+
+ def add_point(self, x: float, y: float, z: float | None, m: float | None) -> None:
+ if math.isnan(x) or math.isnan(y):
+ return
+
+ if self.is_geography:
+ if self._segments is None:
+ self._segments = []
+ circle_lon = _to_circle(_normalize_longitude(x))
+ # Zero-width span at the vertex itself (covers isolated points).
+ self._segments.append((circle_lon, circle_lon))
+ if self._prev_circle_lon is not None:
+ self._segments.append(_edge_minor_arc(self._prev_circle_lon, circle_lon))
+ self._prev_circle_lon = circle_lon
+ else:
+ self.x_min = x if self.x_min is None else min(self.x_min, x)
+ self.x_max = x if self.x_max is None else max(self.x_max, x)
+
+ self.y_min = y if self.y_min is None else min(self.y_min, y)
+ self.y_max = y if self.y_max is None else max(self.y_max, y)
+
+ if z is not None and not math.isnan(z):
+ self.z_min = z if self.z_min is None else min(self.z_min, z)
+ self.z_max = z if self.z_max is None else max(self.z_max, z)
+
+ if m is not None and not math.isnan(m):
+ self.m_min = m if self.m_min is None else min(self.m_min, m)
+ self.m_max = m if self.m_max is None else max(self.m_max, m)
+
+ def finish(self) -> GeometryEnvelope | None:
+ if self.y_min is None or self.y_max is None:
+ return None
+
+ if self.is_geography:
+ if not self._segments:
+ return None
+ x_min, x_max = _longitude_interval_from_segments(self._segments)
+ else:
+ if self.x_min is None or self.x_max is None:
+ return None
+ x_min, x_max = self.x_min, self.x_max
+
+ return GeometryEnvelope(
+ x_min=x_min,
+ y_min=self.y_min,
+ z_min=self.z_min,
+ m_min=self.m_min,
+ x_max=x_max,
+ y_max=self.y_max,
+ z_max=self.z_max,
+ m_max=self.m_max,
+ )
+
+
+class _WKBReader:
+ def __init__(self, payload: bytes) -> None:
+ self._payload = payload
+ self._offset = 0
+
+ def remaining(self) -> int:
+ return len(self._payload) - self._offset
+
+ def read_byte(self) -> int:
+ self._ensure_size(1)
+ value = self._payload[self._offset]
+ self._offset += 1
+ return value
+
+ def read_uint32(self, little_endian: bool) -> int:
+ return int(self._read_fmt("I"))
+
+ def read_double(self, little_endian: bool) -> float:
+ return float(self._read_fmt("d"))
+
+ def _read_fmt(self, fmt: str) -> float | int:
+ size = struct.calcsize(fmt)
+ self._ensure_size(size)
+ value = struct.unpack_from(fmt, self._payload, self._offset)[0]
+ self._offset += size
+ return value
+
+ def _ensure_size(self, expected: int) -> None:
+ if self._offset + expected > len(self._payload):
+ raise ValueError("Unexpected end of WKB payload")
+
+
+def _parse_geometry(reader: _WKBReader, accumulator: _EnvelopeAccumulator) -> None:
+ little_endian = _parse_byte_order(reader.read_byte())
+ raw_type = reader.read_uint32(little_endian)
+ geometry_type, has_z, has_m = _parse_geometry_type(raw_type)
+
+ if raw_type & _EWKB_SRID_FLAG:
+ reader.read_uint32(little_endian)
+
+ if geometry_type == _WKB_POINT:
+ _parse_point(reader, accumulator, little_endian, has_z, has_m)
+ elif geometry_type == _WKB_LINESTRING:
+ _parse_points(reader, accumulator, little_endian, has_z, has_m)
+ elif geometry_type == _WKB_POLYGON:
+ _parse_polygon(reader, accumulator, little_endian, has_z, has_m)
+ elif geometry_type in (_WKB_MULTIPOINT, _WKB_MULTILINESTRING, _WKB_MULTIPOLYGON, _WKB_GEOMETRYCOLLECTION):
+ _parse_collection(reader, accumulator, little_endian)
+ else:
+ raise ValueError(f"Unsupported WKB geometry type: {geometry_type}")
+
+
+def _parse_byte_order(order: int) -> bool:
+ if order == 1:
+ return True
+ if order == 0:
+ return False
+ raise ValueError(f"Unsupported WKB byte order marker: {order}")
+
+
+def _parse_geometry_type(raw_type: int) -> tuple[int, bool, bool]:
+ has_z = bool(raw_type & _EWKB_Z_FLAG)
+ has_m = bool(raw_type & _EWKB_M_FLAG)
+ type_code = raw_type & 0x1FFFFFFF
+
+ if type_code >= 3000:
+ has_z = True
+ has_m = True
+ type_code -= 3000
+ elif type_code >= 2000:
+ has_m = True
+ type_code -= 2000
+ elif type_code >= 1000:
+ has_z = True
+ type_code -= 1000
+
+ return type_code, has_z, has_m
+
+
+def _parse_collection(reader: _WKBReader, accumulator: _EnvelopeAccumulator, little_endian: bool) -> None:
+ num_geometries = reader.read_uint32(little_endian)
+ for _ in range(num_geometries):
+ _parse_geometry(reader, accumulator)
+
+
+def _parse_polygon(
+ reader: _WKBReader,
+ accumulator: _EnvelopeAccumulator,
+ little_endian: bool,
+ has_z: bool,
+ has_m: bool,
+) -> None:
+ num_rings = reader.read_uint32(little_endian)
+ for _ in range(num_rings):
+ _parse_points(reader, accumulator, little_endian, has_z, has_m)
+
+
+def _parse_points(
+ reader: _WKBReader,
+ accumulator: _EnvelopeAccumulator,
+ little_endian: bool,
+ has_z: bool,
+ has_m: bool,
+) -> None:
+ count = reader.read_uint32(little_endian)
+ accumulator.start_sequence()
+ for _ in range(count):
+ x = reader.read_double(little_endian)
+ y = reader.read_double(little_endian)
+ if has_z and has_m:
+ z = reader.read_double(little_endian)
+ m = reader.read_double(little_endian)
+ elif has_z:
+ z = reader.read_double(little_endian)
+ m = None
+ elif has_m:
+ z = None
+ m = reader.read_double(little_endian)
+ else:
+ z = None
+ m = None
+ accumulator.add_point(x=x, y=y, z=z, m=m)
+
+
+def _parse_point(
+ reader: _WKBReader,
+ accumulator: _EnvelopeAccumulator,
+ little_endian: bool,
+ has_z: bool,
+ has_m: bool,
+) -> None:
+ accumulator.start_sequence()
+ x = reader.read_double(little_endian)
+ y = reader.read_double(little_endian)
+
+ if has_z and has_m:
+ z = reader.read_double(little_endian)
+ m = reader.read_double(little_endian)
+ elif has_z:
+ z = reader.read_double(little_endian)
+ m = None
+ elif has_m:
+ z = None
+ m = reader.read_double(little_endian)
+ else:
+ z = None
+ m = None
+
+ accumulator.add_point(x=x, y=y, z=z, m=m)
+
+
+def _normalize_longitude(value: float) -> float:
+ normalized = ((value + 180.0) % 360.0) - 180.0
+ if math.isclose(normalized, -180.0) and value > 0:
+ return 180.0
+ return normalized
+
+
+def _to_circle(value: float) -> float:
+ if math.isclose(value, 180.0):
+ return 360.0
+ return value + 180.0
+
+
+def _from_circle(value: float) -> float:
+ if math.isclose(value, 360.0):
+ return 180.0
+ return value - 180.0
+
+
+def _edge_minor_arc(start_circle: float, end_circle: float) -> tuple[float, float]:
+ # The longitude span of a geography edge is the SHORTER arc between its two
+ # endpoints. A geodesic edge whose endpoints differ by < 180 deg of longitude
+ # stays within that minor arc; at exactly 180 deg the two arcs are equal and
+ # either choice is sound. Returned as a (possibly wrapping) circle-space arc.
+ lo, hi = (start_circle, end_circle) if start_circle <= end_circle else (end_circle, start_circle)
+ direct = hi - lo
+ if direct <= 180.0:
+ return lo, hi
+ # The shorter way wraps across the 0/360 seam (i.e. across the antimeridian).
+ return hi, lo + 360.0
+
+
+def _longitude_interval_from_segments(segments: list[tuple[float, float]]) -> tuple[float, float]:
+ # Converting longitude bounds to/from circle coordinates can introduce tiny
+ # floating-point drift at the reconstructed interval edges. Pruning callers
+ # must use geospatial_pruning.bbox_might_match, which applies conservative
+ # boundary tolerance instead of exact edge comparisons.
+ normalized: list[tuple[float, float]] = []
+ for start, end in segments:
+ if end <= 360.0:
+ normalized.append((start, end))
+ else:
+ # Wrapping arc: split at the 0/360 seam into two non-wrapping segments.
+ normalized.append((start, 360.0))
+ normalized.append((0.0, end - 360.0))
+
+ merged = _merge_segments(normalized)
+ if not merged:
+ raise ValueError("Cannot build longitude interval from empty segments")
+
+ largest_gap = -1.0
+ gap_start = merged[0][1]
+ gap_end = merged[0][0] + 360.0
+ for idx in range(len(merged)):
+ current_end = merged[idx][1]
+ next_start = merged[(idx + 1) % len(merged)][0] + (360.0 if idx == len(merged) - 1 else 0.0)
+ gap = next_start - current_end
+ if gap > largest_gap:
+ largest_gap = gap
+ gap_start = current_end
+ gap_end = next_start
+
+ if largest_gap <= 1e-12:
+ return -180.0, 180.0
+
+ start = gap_end % 360.0
+ end = gap_start % 360.0
+ return _from_circle(start), _from_circle(end)
+
+
+def _merge_longitude_intervals(left_min: float, left_max: float, right_min: float, right_max: float) -> tuple[float, float]:
+ segments = _interval_to_segments(left_min, left_max) + _interval_to_segments(right_min, right_max)
+ merged = _merge_segments(segments)
+ if not merged:
+ raise ValueError("Cannot merge empty longitude intervals")
+
+ largest_gap = -1.0
+ gap_start = 0.0
+ gap_end = 0.0
+ for idx in range(len(merged)):
+ current_end = merged[idx][1]
+ next_start = merged[(idx + 1) % len(merged)][0] + (360.0 if idx == len(merged) - 1 else 0.0)
+ gap = next_start - current_end
+ if gap > largest_gap:
+ largest_gap = gap
+ gap_start = current_end
+ gap_end = next_start
+
+ if largest_gap <= 1e-12:
+ return -180.0, 180.0
+
+ start = gap_end % 360.0
+ end = gap_start % 360.0
+ return _from_circle(start), _from_circle(end)
+
+
+def _interval_to_segments(x_min: float, x_max: float) -> list[tuple[float, float]]:
+ start = _to_circle(_normalize_longitude(x_min))
+ end = _to_circle(_normalize_longitude(x_max))
+
+ if x_min <= x_max:
+ return [(start, end)]
+ return [(start, 360.0), (0.0, end)]
+
+
+def _merge_segments(segments: list[tuple[float, float]]) -> list[tuple[float, float]]:
+ if not segments:
+ return []
+
+ ordered = sorted(segments, key=lambda pair: pair[0])
+ merged: list[tuple[float, float]] = [ordered[0]]
+ for start, end in ordered[1:]:
+ previous_start, previous_end = merged[-1]
+ if start <= previous_end:
+ merged[-1] = (previous_start, max(previous_end, end))
+ else:
+ merged.append((start, end))
+ return merged
diff --git a/pyiceberg/utils/geospatial_pruning.py b/pyiceberg/utils/geospatial_pruning.py
new file mode 100644
index 0000000000..cabb89199d
--- /dev/null
+++ b/pyiceberg/utils/geospatial_pruning.py
@@ -0,0 +1,139 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Geospatial bounding-box pruning helpers.
+
+Returns False ONLY when no row in the file can possibly match; any uncertainty
+returns True. False negatives (wrongly returning False) would cause silent data loss.
+"""
+
+from __future__ import annotations
+
+import math
+
+from pyiceberg.utils.geospatial import deserialize_geospatial_bound, extract_envelope_from_wkb
+
+_SUPPORTED_PREDICATES = {"st-contains", "st-intersects", "st-overlaps", "st-within"}
+_BOUNDARY_ABSOLUTE_EPSILON = 1e-7
+_BOUNDARY_RELATIVE_EPSILON = 1e-12
+
+
+def bbox_might_match(
+ predicate: str,
+ query_wkb: bytes,
+ lower_bound: bytes | None,
+ upper_bound: bytes | None,
+ is_geography: bool,
+) -> bool:
+ if predicate not in _SUPPORTED_PREDICATES:
+ raise ValueError(f"Unsupported geospatial predicate for bbox pruning: {predicate}")
+
+ if lower_bound is None or upper_bound is None:
+ return True
+
+ query_envelope = extract_envelope_from_wkb(query_wkb, is_geography)
+ if query_envelope is None:
+ return True
+
+ lower = deserialize_geospatial_bound(lower_bound)
+ upper = deserialize_geospatial_bound(upper_bound)
+
+ # A non-finite (NaN/inf) stored bound is malformed/unknown. NaN comparisons are
+ # always false, which would silently prune to False, so fall back to "might match".
+ if not all(math.isfinite(value) for value in (lower.x, lower.y, upper.x, upper.y)):
+ return True
+
+ y_overlaps = _scalar_intervals_overlap(lower.y, upper.y, query_envelope.y_min, query_envelope.y_max)
+ if not y_overlaps:
+ return False
+
+ if is_geography:
+ x_overlaps = _longitude_intervals_overlap(lower.x, upper.x, query_envelope.x_min, query_envelope.x_max)
+ else:
+ x_overlaps = _scalar_intervals_overlap(lower.x, upper.x, query_envelope.x_min, query_envelope.x_max)
+
+ return x_overlaps
+
+
+def _scalar_intervals_overlap(left_min: float, left_max: float, right_min: float, right_max: float) -> bool:
+ left_start = min(left_min, left_max)
+ left_end = max(left_min, left_max)
+ right_start = min(right_min, right_max)
+ right_end = max(right_min, right_max)
+
+ # BBox pruning must be conservative: returning False can drop matching rows.
+ # Geography bounds round-trip longitude through circle coordinates, which can
+ # drift stored edges inward by tiny double-precision amounts. Expand only the
+ # boundary comparisons so edge-equal queries remain "might match".
+ epsilon = _interval_boundary_epsilon(left_start, left_end, right_start, right_end)
+ return left_start <= right_end + epsilon and right_start <= left_end + epsilon
+
+
+def _interval_boundary_epsilon(*values: float) -> float:
+ finite_values = (abs(value) for value in values if math.isfinite(value))
+ scale = max(finite_values, default=1.0)
+ return _BOUNDARY_ABSOLUTE_EPSILON + (_BOUNDARY_RELATIVE_EPSILON * scale)
+
+
+def _longitude_intervals_overlap(left_min: float, left_max: float, right_min: float, right_max: float) -> bool:
+ left_segments = _longitude_interval_to_segments(left_min, left_max)
+ right_segments = _longitude_interval_to_segments(right_min, right_max)
+
+ return any(
+ _longitude_segments_overlap(left_segment, right_segment)
+ for left_segment in left_segments
+ for right_segment in right_segments
+ )
+
+
+def _longitude_segments_overlap(left_segment: tuple[float, float], right_segment: tuple[float, float]) -> bool:
+ left_start, left_end = left_segment
+ right_start, right_end = right_segment
+
+ return any(
+ _scalar_intervals_overlap(left_start, left_end, right_start + shift, right_end + shift) for shift in (-360.0, 0.0, 360.0)
+ )
+
+
+def _longitude_interval_to_segments(x_min: float, x_max: float) -> list[tuple[float, float]]:
+ start = _longitude_to_circle(x_min)
+ end = _longitude_to_circle(x_max)
+
+ if _is_full_longitude_interval(x_min, x_max):
+ return [(0.0, 360.0)]
+
+ if x_min <= x_max:
+ return [(start, end)]
+
+ return [(start, 360.0), (0.0, end)]
+
+
+def _is_full_longitude_interval(x_min: float, x_max: float) -> bool:
+ return math.isclose(_normalize_longitude(x_min), -180.0) and math.isclose(_normalize_longitude(x_max), 180.0)
+
+
+def _normalize_longitude(value: float) -> float:
+ normalized = ((value + 180.0) % 360.0) - 180.0
+ if math.isclose(normalized, -180.0) and value > 0:
+ return 180.0
+ return normalized
+
+
+def _longitude_to_circle(value: float) -> float:
+ normalized = _normalize_longitude(value)
+ if math.isclose(normalized, 180.0):
+ return 360.0
+ return normalized + 180.0
diff --git a/tests/utils/test_geospatial.py b/tests/utils/test_geospatial.py
new file mode 100644
index 0000000000..2caecc447c
--- /dev/null
+++ b/tests/utils/test_geospatial.py
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import math
+import struct
+
+import pytest
+
+from pyiceberg.utils.geospatial import (
+ GeospatialBound,
+ deserialize_geospatial_bound,
+ extract_envelope_from_wkb,
+ merge_envelopes,
+ serialize_geospatial_bound,
+)
+
+
+def test_geospatial_bound_serde_xy() -> None:
+ raw = serialize_geospatial_bound(GeospatialBound(x=10.0, y=20.0))
+ assert len(raw) == 16
+ bound = deserialize_geospatial_bound(raw)
+ assert bound.x == 10.0
+ assert bound.y == 20.0
+ assert bound.z is None
+ assert bound.m is None
+
+
+def test_geospatial_bound_serde_xyz() -> None:
+ raw = serialize_geospatial_bound(GeospatialBound(x=10.0, y=20.0, z=30.0))
+ assert len(raw) == 24
+ bound = deserialize_geospatial_bound(raw)
+ assert bound.x == 10.0
+ assert bound.y == 20.0
+ assert bound.z == 30.0
+ assert bound.m is None
+
+
+def test_geospatial_bound_serde_xym() -> None:
+ raw = serialize_geospatial_bound(GeospatialBound(x=10.0, y=20.0, m=40.0))
+ assert len(raw) == 32
+ x, y, z, m = struct.unpack(" None:
+ raw = serialize_geospatial_bound(GeospatialBound(x=10.0, y=20.0, z=30.0, m=40.0))
+ assert len(raw) == 32
+ bound = deserialize_geospatial_bound(raw)
+ assert bound.x == 10.0
+ assert bound.y == 20.0
+ assert bound.z == 30.0
+ assert bound.m == 40.0
+
+
+def test_geospatial_bound_serde_rejects_ambiguous_nan_z_with_m() -> None:
+ with pytest.raises(ValueError, match="NaN z"):
+ serialize_geospatial_bound(GeospatialBound(x=1.0, y=2.0, z=math.nan, m=5.0))
+
+ xym = GeospatialBound(x=1.0, y=2.0, z=None, m=5.0)
+ assert deserialize_geospatial_bound(serialize_geospatial_bound(xym)) == xym
+
+ xyzm = GeospatialBound(x=1.0, y=2.0, z=3.0, m=5.0)
+ assert deserialize_geospatial_bound(serialize_geospatial_bound(xyzm)) == xyzm
+
+
+def test_extract_envelope_geometry() -> None:
+ # LINESTRING(170 0, -170 1)
+ wkb = struct.pack(" None:
+ # LINESTRING(170 0, -170 1)
+ wkb = struct.pack(" envelope.x_max
+ assert envelope.x_min == 170.0
+ assert envelope.x_max == -170.0
+ assert envelope.y_min == 0.0
+ assert envelope.y_max == 1.0
+
+
+def test_extract_envelope_xyzm_linestring() -> None:
+ # LINESTRING ZM (0 1 2 3, 4 5 6 7)
+ wkb = struct.pack(" None:
+ # LINESTRING(-120 0, 0 0, 120 0): the connected edges occupy the -120..120 arc
+ # through 0. A vertex-set minimal-arc heuristic would instead wrap and exclude a
+ # real edge (e.g. the segment containing longitude -60), dropping matching rows.
+ wkb = struct.pack(" None:
+ # MULTIPOINT(-120 0, 0 0, 120 0): no edges connect the points, so the minimal
+ # enclosing arc is the short way (wrapping the antimeridian) since the largest
+ # gap among the three points is the -120..120 span through 0.
+ point = lambda x, y: struct.pack(" envelope.x_max # wraps: 120 .. -120 across +-180
+
+
+def test_merge_geography_envelopes() -> None:
+ left = extract_envelope_from_wkb(struct.pack(" merged.x_max
+ assert merged.x_min == 170.0
+ assert merged.x_max == -120.0
+ assert merged.y_min == 0.0
+ assert merged.y_max == 3.0
diff --git a/tests/utils/test_geospatial_pruning.py b/tests/utils/test_geospatial_pruning.py
new file mode 100644
index 0000000000..70389ff387
--- /dev/null
+++ b/tests/utils/test_geospatial_pruning.py
@@ -0,0 +1,385 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import struct
+from random import Random
+
+import pytest
+
+from pyiceberg.utils.geospatial import (
+ GeospatialBound,
+ GeospatialStatsAggregator,
+ extract_envelope_from_wkb,
+ serialize_geospatial_bound,
+)
+from pyiceberg.utils.geospatial_pruning import bbox_might_match
+
+_SUPPORTED_PREDICATES = ("st-contains", "st-intersects", "st-overlaps", "st-within")
+
+
+def _point_wkb(x: float, y: float) -> bytes:
+ return struct.pack(" bytes:
+ return struct.pack(">BIdd", 0, 1, x, y)
+
+
+def _point_xyzm_wkb(x: float, y: float, z: float, m: float) -> bytes:
+ return struct.pack(" bytes:
+ ordinates = [ordinate for point in points for ordinate in point]
+ return struct.pack(" bytes:
+ points = [
+ (x_min, y_min),
+ (x_max, y_min),
+ (x_max, y_max),
+ (x_min, y_max),
+ (x_min, y_min),
+ ]
+ ordinates = [ordinate for point in points for ordinate in point]
+ return struct.pack(" bytes:
+ return serialize_geospatial_bound(GeospatialBound(x=x, y=y, z=z, m=m))
+
+
+def _bounds(x_min: float, y_min: float, x_max: float, y_max: float) -> tuple[bytes, bytes]:
+ return _bound(x_min, y_min), _bound(x_max, y_max)
+
+
+def _stats_bounds(points: list[tuple[float, float]], is_geography: bool) -> tuple[bytes, bytes]:
+ aggregator = GeospatialStatsAggregator(is_geography=is_geography)
+ for x, y in points:
+ aggregator.add(_point_wkb(x, y))
+
+ lower_bound = aggregator.serialized_min()
+ upper_bound = aggregator.serialized_max()
+ assert lower_bound is not None
+ assert upper_bound is not None
+ return lower_bound, upper_bound
+
+
+@pytest.mark.parametrize("predicate", ["st-intersects", "st-overlaps", "st-contains"])
+def test_bbox_pruning_disjoint_geometry_returns_false(predicate: str) -> None:
+ lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0)
+ query_wkb = _polygon_bbox_wkb(20.0, 20.0, 30.0, 30.0)
+
+ assert not bbox_might_match(predicate, query_wkb, lower_bound, upper_bound, is_geography=False)
+
+
+def test_bbox_pruning_overlapping_geometry_returns_true() -> None:
+ lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0)
+ query_wkb = _polygon_bbox_wkb(5.0, 5.0, 15.0, 15.0)
+
+ assert bbox_might_match("st-intersects", query_wkb, lower_bound, upper_bound, is_geography=False)
+
+
+@pytest.mark.parametrize(("lower_bound", "upper_bound"), [(None, _bound(10.0, 10.0)), (_bound(0.0, 0.0), None)])
+def test_bbox_pruning_missing_bounds_returns_true(lower_bound: bytes | None, upper_bound: bytes | None) -> None:
+ assert bbox_might_match("st-intersects", _point_wkb(100.0, 100.0), lower_bound, upper_bound, is_geography=False)
+
+
+def test_bbox_pruning_within_disjoint_returns_false() -> None:
+ lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0)
+ query_wkb = _polygon_bbox_wkb(20.0, 20.0, 30.0, 30.0)
+
+ assert not bbox_might_match("st-within", query_wkb, lower_bound, upper_bound, is_geography=False)
+
+
+def test_bbox_pruning_within_overlapping_returns_true() -> None:
+ lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0)
+ query_wkb = _polygon_bbox_wkb(5.0, 5.0, 15.0, 15.0)
+
+ assert bbox_might_match("st-within", query_wkb, lower_bound, upper_bound, is_geography=False)
+
+
+def test_bbox_pruning_antimeridian_wrapped_file_matches_query_inside_interval() -> None:
+ lower_bound, upper_bound = _bounds(170.0, -10.0, -170.0, 10.0)
+
+ assert bbox_might_match("st-intersects", _point_wkb(178.0, 0.0), lower_bound, upper_bound, is_geography=True)
+
+
+def test_bbox_pruning_antimeridian_wrapped_file_prunes_query_outside_interval() -> None:
+ lower_bound, upper_bound = _bounds(170.0, -10.0, -170.0, 10.0)
+
+ assert not bbox_might_match("st-intersects", _point_wkb(0.0, 0.0), lower_bound, upper_bound, is_geography=True)
+
+
+@pytest.mark.parametrize(
+ ("file_x_min", "file_x_max", "query_x"),
+ [
+ (170.0, 180.0, 180.0),
+ (170.0, 180.0, -180.0),
+ (180.0, 180.0, 180.0),
+ (180.0, 180.0, -180.0),
+ (-180.0, -180.0, 180.0),
+ (-180.0, -180.0, -180.0),
+ ],
+)
+def test_bbox_pruning_geography_antimeridian_seam_is_closed(
+ file_x_min: float,
+ file_x_max: float,
+ query_x: float,
+) -> None:
+ lower_bound, upper_bound = _bounds(file_x_min, 0.0, file_x_max, 10.0)
+
+ assert bbox_might_match("st-intersects", _point_wkb(query_x, 5.0), lower_bound, upper_bound, is_geography=True)
+
+
+@pytest.mark.parametrize(
+ ("file_x_min", "file_y_min", "file_x_max", "file_y_max", "query_x", "query_y"),
+ [
+ (170.0, 0.0, 180.0, 10.0, 0.0, 5.0),
+ (170.0, 0.0, 180.0, 10.0, -90.0, 5.0),
+ (180.0, 0.0, 180.0, 10.0, 179.0, 5.0),
+ (-180.0, 0.0, -180.0, 10.0, -179.0, 5.0),
+ (170.0, 0.0, -170.0, 10.0, 0.0, 5.0),
+ ],
+)
+def test_bbox_pruning_geography_negative_mutation_guard(
+ file_x_min: float,
+ file_y_min: float,
+ file_x_max: float,
+ file_y_max: float,
+ query_x: float,
+ query_y: float,
+) -> None:
+ # Mutation guard: keep at least five False assertions so a return-True stub fails loudly.
+ lower_bound, upper_bound = _bounds(file_x_min, file_y_min, file_x_max, file_y_max)
+
+ assert not bbox_might_match("st-intersects", _point_wkb(query_x, query_y), lower_bound, upper_bound, is_geography=True)
+
+
+def test_bbox_pruning_big_endian_point_wkb_extracts_and_prunes() -> None:
+ query_wkb = _big_endian_point_wkb(5.0, 6.0)
+ envelope = extract_envelope_from_wkb(query_wkb, is_geography=False)
+
+ assert envelope is not None
+ assert envelope.x_min == 5.0
+ assert envelope.x_max == 5.0
+ assert envelope.y_min == 6.0
+ assert envelope.y_max == 6.0
+
+ matching_lower_bound, matching_upper_bound = _bounds(0.0, 0.0, 10.0, 10.0)
+ disjoint_lower_bound, disjoint_upper_bound = _bounds(20.0, 0.0, 30.0, 10.0)
+
+ assert bbox_might_match(
+ "st-intersects",
+ query_wkb,
+ matching_lower_bound,
+ matching_upper_bound,
+ is_geography=False,
+ )
+ assert not bbox_might_match(
+ "st-intersects",
+ query_wkb,
+ disjoint_lower_bound,
+ disjoint_upper_bound,
+ is_geography=False,
+ )
+
+
+def test_bbox_pruning_false_negative_guard_for_points_inside_wrapped_file_bbox() -> None:
+ lower_bound, upper_bound = _bounds(170.0, -20.0, -160.0, 20.0)
+
+ for longitude, latitude in [
+ (170.0, -20.0),
+ (174.5, 3.0),
+ (179.0, 19.0),
+ (180.0, 0.0),
+ (-179.5, -7.0),
+ (-170.0, 20.0),
+ (-160.0, 0.5),
+ ]:
+ assert bbox_might_match(
+ "st-intersects",
+ _point_wkb(longitude, latitude),
+ lower_bound,
+ upper_bound,
+ is_geography=True,
+ )
+
+
+def test_bbox_pruning_ignores_z_and_m_dimensions() -> None:
+ lower_bound = _bound(0.0, 0.0, z=0.0, m=0.0)
+ upper_bound = _bound(10.0, 10.0, z=1.0, m=1.0)
+ query_wkb = _point_xyzm_wkb(5.0, 5.0, 999.0, 999.0)
+
+ assert bbox_might_match("st-intersects", query_wkb, lower_bound, upper_bound, is_geography=False)
+
+
+@pytest.mark.parametrize("is_geography", [False, True])
+@pytest.mark.parametrize(
+ ("lower", "upper"),
+ [
+ (GeospatialBound(x=float("nan"), y=0.0), GeospatialBound(x=10.0, y=10.0)),
+ (GeospatialBound(x=0.0, y=float("nan")), GeospatialBound(x=10.0, y=10.0)),
+ (GeospatialBound(x=0.0, y=0.0), GeospatialBound(x=float("nan"), y=10.0)),
+ (GeospatialBound(x=0.0, y=0.0), GeospatialBound(x=10.0, y=float("inf"))),
+ ],
+)
+def test_bbox_pruning_non_finite_bound_is_treated_as_might_match(
+ is_geography: bool, lower: GeospatialBound, upper: GeospatialBound
+) -> None:
+ # Safety contract: a malformed/non-finite stored bound must never prune to False.
+ lower_bound = serialize_geospatial_bound(lower)
+ upper_bound = serialize_geospatial_bound(upper)
+
+ assert bbox_might_match("st-intersects", _point_wkb(5.0, 5.0), lower_bound, upper_bound, is_geography=is_geography)
+
+
+def test_bbox_pruning_empty_query_returns_true() -> None:
+ lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0)
+ query_wkb = _linestring_wkb([])
+
+ assert bbox_might_match("st-intersects", query_wkb, lower_bound, upper_bound, is_geography=False)
+
+
+def test_bbox_pruning_geography_preserves_file_edge_point_after_circle_round_trip() -> None:
+ points = [(-9.32, 29.55), (-158.16, 36.27), (52.97, 88.76)]
+ lower_bound, upper_bound = _stats_bounds(points, is_geography=True)
+
+ assert bbox_might_match(
+ "st-intersects",
+ _point_wkb(-158.16, 36.27),
+ lower_bound,
+ upper_bound,
+ is_geography=True,
+ )
+
+
+@pytest.mark.parametrize(
+ ("is_geography", "lon_range", "lat_range"),
+ [
+ (True, (-180.0, 180.0), (-90.0, 90.0)),
+ (False, (-1_000_000.0, 1_000_000.0), (-1_000_000.0, 1_000_000.0)),
+ ],
+)
+def test_bbox_pruning_never_prunes_files_own_points(
+ is_geography: bool,
+ lon_range: tuple[float, float],
+ lat_range: tuple[float, float],
+) -> None:
+ rng = Random(1729 if is_geography else 2718)
+ false_negatives: list[tuple[list[tuple[float, float]], tuple[float, float], str]] = []
+
+ for _ in range(2000):
+ points = [
+ (
+ rng.uniform(*lon_range),
+ rng.uniform(*lat_range),
+ )
+ for _ in range(rng.randint(1, 5))
+ ]
+ lower_bound, upper_bound = _stats_bounds(points, is_geography=is_geography)
+
+ for point in points:
+ query_wkb = _point_wkb(*point)
+ for predicate in _SUPPORTED_PREDICATES:
+ if not bbox_might_match(
+ predicate,
+ query_wkb,
+ lower_bound,
+ upper_bound,
+ is_geography=is_geography,
+ ):
+ false_negatives.append((points, point, predicate))
+
+ assert false_negatives == []
+
+
+def _norm_longitude(value: float) -> float:
+ return ((value + 180.0) % 360.0) - 180.0
+
+
+def _interpolate_along_minor_arc(start: tuple[float, float], end: tuple[float, float], fraction: float) -> tuple[float, float]:
+ start_circle = _norm_longitude(start[0]) + 180.0
+ end_circle = _norm_longitude(end[0]) + 180.0
+ delta = end_circle - start_circle
+ if delta > 180.0:
+ delta -= 360.0
+ if delta < -180.0:
+ delta += 360.0
+ longitude = _norm_longitude((start_circle + delta * fraction) - 180.0)
+ latitude = start[1] + (end[1] - start[1]) * fraction
+ return longitude, latitude
+
+
+def test_bbox_pruning_geography_keeps_point_on_excluded_edge() -> None:
+ # Regression: LINESTRING(-120 0, 0 0, 120 0) contains POINT(-60 0) on its first
+ # edge. A vertex-set minimal-arc bound wrongly wrapped and pruned this point.
+ line = struct.pack(" None:
+ rng = Random(20240623)
+ false_negatives: list[tuple[list[tuple[float, float]], tuple[float, float], str]] = []
+
+ for _ in range(2000):
+ vertices = [(rng.uniform(-180.0, 180.0), rng.uniform(-85.0, 85.0)) for _ in range(rng.randint(2, 5))]
+ aggregator = GeospatialStatsAggregator(is_geography=True)
+ aggregator.add(_linestring_wkb(vertices))
+ lower_bound = aggregator.serialized_min()
+ upper_bound = aggregator.serialized_max()
+ assert lower_bound is not None
+ assert upper_bound is not None
+
+ for index in range(len(vertices) - 1):
+ for fraction in (0.0, 0.25, 0.5, 0.75, 1.0):
+ sample = _interpolate_along_minor_arc(vertices[index], vertices[index + 1], fraction)
+ for predicate in _SUPPORTED_PREDICATES:
+ if not bbox_might_match(
+ predicate,
+ _point_wkb(*sample),
+ lower_bound,
+ upper_bound,
+ is_geography=True,
+ ):
+ false_negatives.append((vertices, sample, predicate))
+
+ assert false_negatives == []
+
+
+def test_bbox_pruning_geography_still_prunes_clearly_disjoint_query() -> None:
+ lower_bound, upper_bound = _stats_bounds(
+ [(-1.0, -1.0), (0.0, 0.0), (1.0, 1.0)],
+ is_geography=True,
+ )
+
+ assert not bbox_might_match("st-intersects", _point_wkb(170.0, 80.0), lower_bound, upper_bound, is_geography=True)
diff --git a/tests/utils/test_geospatial_stats.py b/tests/utils/test_geospatial_stats.py
new file mode 100644
index 0000000000..a8d375e36a
--- /dev/null
+++ b/tests/utils/test_geospatial_stats.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import struct
+
+from pyiceberg.utils.geospatial import GeospatialStatsAggregator, deserialize_geospatial_bound
+
+
+def _point_wkb(x: float, y: float) -> bytes:
+ return struct.pack(" bytes:
+ ordinates = [ordinate for point in points for ordinate in point]
+ return struct.pack(" None:
+ aggregator = GeospatialStatsAggregator(is_geography=False)
+
+ aggregator.add(_point_wkb(10.0, -3.0))
+ aggregator.add(_linestring_xyzm_wkb([(-2.0, 4.0, 7.0, 3.0), (6.0, 9.0, 2.0, 8.0)]))
+
+ min_bound = aggregator.min_bound()
+ max_bound = aggregator.max_bound()
+ assert min_bound is not None
+ assert max_bound is not None
+ assert min_bound.x == -2.0
+ assert min_bound.y == -3.0
+ assert min_bound.z == 2.0
+ assert min_bound.m == 3.0
+ assert max_bound.x == 10.0
+ assert max_bound.y == 9.0
+ assert max_bound.z == 7.0
+ assert max_bound.m == 8.0
+
+ serialized_min = aggregator.serialized_min()
+ serialized_max = aggregator.serialized_max()
+ assert serialized_min is not None
+ assert serialized_max is not None
+ assert deserialize_geospatial_bound(serialized_min) == aggregator.min_bound()
+ assert deserialize_geospatial_bound(serialized_max) == aggregator.max_bound()
+
+
+def test_geospatial_stats_aggregator_geography_wraps_antimeridian() -> None:
+ aggregator = GeospatialStatsAggregator(is_geography=True)
+
+ aggregator.add(_point_wkb(170.0, 1.0))
+ aggregator.add(_point_wkb(-170.0, 2.0))
+
+ min_bound = aggregator.min_bound()
+ max_bound = aggregator.max_bound()
+
+ assert min_bound is not None
+ assert max_bound is not None
+ assert min_bound.x > max_bound.x
+ assert min_bound.x == 170.0
+ assert max_bound.x == -170.0
+ assert min_bound.y == 1.0
+ assert max_bound.y == 2.0
+
+
+def test_geospatial_stats_aggregator_empty_input_has_no_bounds() -> None:
+ aggregator = GeospatialStatsAggregator(is_geography=False)
+
+ assert aggregator.min_bound() is None
+ assert aggregator.max_bound() is None
+ assert aggregator.serialized_min() is None
+ assert aggregator.serialized_max() is None