diff --git a/pyiceberg/utils/geospatial.py b/pyiceberg/utils/geospatial.py new file mode 100644 index 0000000000..da337be20d --- /dev/null +++ b/pyiceberg/utils/geospatial.py @@ -0,0 +1,529 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import math +import struct +from dataclasses import dataclass + +_WKB_POINT = 1 +_WKB_LINESTRING = 2 +_WKB_POLYGON = 3 +_WKB_MULTIPOINT = 4 +_WKB_MULTILINESTRING = 5 +_WKB_MULTIPOLYGON = 6 +_WKB_GEOMETRYCOLLECTION = 7 + +_EWKB_Z_FLAG = 0x80000000 +_EWKB_M_FLAG = 0x40000000 +_EWKB_SRID_FLAG = 0x20000000 + +# The 32-byte bound wire format is shared by XYM and XYZM. XYM reserves a NaN +# value in the z slot as a sentinel, so a real NaN z cannot be serialized when +# m is present without losing whether z existed. + + +@dataclass(frozen=True) +class GeospatialBound: + x: float + y: float + z: float | None = None + m: float | None = None + + @property + def has_z(self) -> bool: + return self.z is not None + + @property + def has_m(self) -> bool: + return self.m is not None + + +@dataclass(frozen=True) +class GeometryEnvelope: + x_min: float + y_min: float + z_min: float | None + m_min: float | None + x_max: float + y_max: float + z_max: float | None + m_max: float | None + + def to_min_bound(self) -> GeospatialBound: + return GeospatialBound(x=self.x_min, y=self.y_min, z=self.z_min, m=self.m_min) + + def to_max_bound(self) -> GeospatialBound: + return GeospatialBound(x=self.x_max, y=self.y_max, z=self.z_max, m=self.m_max) + + +def serialize_geospatial_bound(bound: GeospatialBound) -> bytes: + if bound.z is not None and bound.m is not None and math.isnan(bound.z): + raise ValueError("Cannot serialize geospatial bound with NaN z and m; NaN z is reserved as the XYM sentinel") + + if bound.z is None and bound.m is None: + return struct.pack(" GeospatialBound: + if len(raw) == 16: + x, y = struct.unpack(" GeometryEnvelope | None: + reader = _WKBReader(wkb) + accumulator = _EnvelopeAccumulator(is_geography=is_geography) + _parse_geometry(reader, accumulator) + if reader.remaining() != 0: + raise ValueError(f"Trailing bytes found after parsing WKB: {reader.remaining()}") + return accumulator.finish() + + +def merge_envelopes(left: GeometryEnvelope, right: GeometryEnvelope, is_geography: bool) -> GeometryEnvelope: + if is_geography: + x_min, x_max = _merge_longitude_intervals(left.x_min, left.x_max, right.x_min, right.x_max) + else: + x_min, x_max = min(left.x_min, right.x_min), max(left.x_max, right.x_max) + + return GeometryEnvelope( + x_min=x_min, + y_min=min(left.y_min, right.y_min), + z_min=_merge_optional_min(left.z_min, right.z_min), + m_min=_merge_optional_min(left.m_min, right.m_min), + x_max=x_max, + y_max=max(left.y_max, right.y_max), + z_max=_merge_optional_max(left.z_max, right.z_max), + m_max=_merge_optional_max(left.m_max, right.m_max), + ) + + +class GeospatialStatsAggregator: + def __init__(self, is_geography: bool) -> None: + self.is_geography = is_geography + self._envelope: GeometryEnvelope | None = None + + def add(self, wkb: bytes) -> None: + envelope = extract_envelope_from_wkb(wkb, self.is_geography) + if envelope is None: + return + + if self._envelope is None: + self._envelope = envelope + else: + self._envelope = merge_envelopes(self._envelope, envelope, self.is_geography) + + def min_bound(self) -> GeospatialBound | None: + if self._envelope is None: + return None + return self._envelope.to_min_bound() + + def max_bound(self) -> GeospatialBound | None: + if self._envelope is None: + return None + return self._envelope.to_max_bound() + + def serialized_min(self) -> bytes | None: + bound = self.min_bound() + if bound is None: + return None + return serialize_geospatial_bound(bound) + + def serialized_max(self) -> bytes | None: + bound = self.max_bound() + if bound is None: + return None + return serialize_geospatial_bound(bound) + + +def _merge_optional_min(left: float | None, right: float | None) -> float | None: + if left is None: + return right + if right is None: + return left + return min(left, right) + + +def _merge_optional_max(left: float | None, right: float | None) -> float | None: + if left is None: + return right + if right is None: + return left + return max(left, right) + + +@dataclass +class _EnvelopeAccumulator: + is_geography: bool + x_min: float | None = None + y_min: float | None = None + z_min: float | None = None + m_min: float | None = None + x_max: float | None = None + y_max: float | None = None + z_max: float | None = None + m_max: float | None = None + # Geography longitude occupancy is the union of per-EDGE minor-arc spans (in + # circle coordinates [0, 360)). Collecting bare vertices and taking their + # minimal enclosing arc is unsound: it can exclude an edge that connects two + # vertices the "long way" around, dropping rows that lie on that edge. + _segments: list[tuple[float, float]] | None = None + _prev_circle_lon: float | None = None + + def __post_init__(self) -> None: + if self.is_geography: + self._segments = [] + + def start_sequence(self) -> None: + # Edges only connect consecutive vertices WITHIN the same linestring/ring. + # Resetting before each sequence prevents a spurious edge across sub-geometries. + self._prev_circle_lon = None + + def add_point(self, x: float, y: float, z: float | None, m: float | None) -> None: + if math.isnan(x) or math.isnan(y): + return + + if self.is_geography: + if self._segments is None: + self._segments = [] + circle_lon = _to_circle(_normalize_longitude(x)) + # Zero-width span at the vertex itself (covers isolated points). + self._segments.append((circle_lon, circle_lon)) + if self._prev_circle_lon is not None: + self._segments.append(_edge_minor_arc(self._prev_circle_lon, circle_lon)) + self._prev_circle_lon = circle_lon + else: + self.x_min = x if self.x_min is None else min(self.x_min, x) + self.x_max = x if self.x_max is None else max(self.x_max, x) + + self.y_min = y if self.y_min is None else min(self.y_min, y) + self.y_max = y if self.y_max is None else max(self.y_max, y) + + if z is not None and not math.isnan(z): + self.z_min = z if self.z_min is None else min(self.z_min, z) + self.z_max = z if self.z_max is None else max(self.z_max, z) + + if m is not None and not math.isnan(m): + self.m_min = m if self.m_min is None else min(self.m_min, m) + self.m_max = m if self.m_max is None else max(self.m_max, m) + + def finish(self) -> GeometryEnvelope | None: + if self.y_min is None or self.y_max is None: + return None + + if self.is_geography: + if not self._segments: + return None + x_min, x_max = _longitude_interval_from_segments(self._segments) + else: + if self.x_min is None or self.x_max is None: + return None + x_min, x_max = self.x_min, self.x_max + + return GeometryEnvelope( + x_min=x_min, + y_min=self.y_min, + z_min=self.z_min, + m_min=self.m_min, + x_max=x_max, + y_max=self.y_max, + z_max=self.z_max, + m_max=self.m_max, + ) + + +class _WKBReader: + def __init__(self, payload: bytes) -> None: + self._payload = payload + self._offset = 0 + + def remaining(self) -> int: + return len(self._payload) - self._offset + + def read_byte(self) -> int: + self._ensure_size(1) + value = self._payload[self._offset] + self._offset += 1 + return value + + def read_uint32(self, little_endian: bool) -> int: + return int(self._read_fmt("I")) + + def read_double(self, little_endian: bool) -> float: + return float(self._read_fmt("d")) + + def _read_fmt(self, fmt: str) -> float | int: + size = struct.calcsize(fmt) + self._ensure_size(size) + value = struct.unpack_from(fmt, self._payload, self._offset)[0] + self._offset += size + return value + + def _ensure_size(self, expected: int) -> None: + if self._offset + expected > len(self._payload): + raise ValueError("Unexpected end of WKB payload") + + +def _parse_geometry(reader: _WKBReader, accumulator: _EnvelopeAccumulator) -> None: + little_endian = _parse_byte_order(reader.read_byte()) + raw_type = reader.read_uint32(little_endian) + geometry_type, has_z, has_m = _parse_geometry_type(raw_type) + + if raw_type & _EWKB_SRID_FLAG: + reader.read_uint32(little_endian) + + if geometry_type == _WKB_POINT: + _parse_point(reader, accumulator, little_endian, has_z, has_m) + elif geometry_type == _WKB_LINESTRING: + _parse_points(reader, accumulator, little_endian, has_z, has_m) + elif geometry_type == _WKB_POLYGON: + _parse_polygon(reader, accumulator, little_endian, has_z, has_m) + elif geometry_type in (_WKB_MULTIPOINT, _WKB_MULTILINESTRING, _WKB_MULTIPOLYGON, _WKB_GEOMETRYCOLLECTION): + _parse_collection(reader, accumulator, little_endian) + else: + raise ValueError(f"Unsupported WKB geometry type: {geometry_type}") + + +def _parse_byte_order(order: int) -> bool: + if order == 1: + return True + if order == 0: + return False + raise ValueError(f"Unsupported WKB byte order marker: {order}") + + +def _parse_geometry_type(raw_type: int) -> tuple[int, bool, bool]: + has_z = bool(raw_type & _EWKB_Z_FLAG) + has_m = bool(raw_type & _EWKB_M_FLAG) + type_code = raw_type & 0x1FFFFFFF + + if type_code >= 3000: + has_z = True + has_m = True + type_code -= 3000 + elif type_code >= 2000: + has_m = True + type_code -= 2000 + elif type_code >= 1000: + has_z = True + type_code -= 1000 + + return type_code, has_z, has_m + + +def _parse_collection(reader: _WKBReader, accumulator: _EnvelopeAccumulator, little_endian: bool) -> None: + num_geometries = reader.read_uint32(little_endian) + for _ in range(num_geometries): + _parse_geometry(reader, accumulator) + + +def _parse_polygon( + reader: _WKBReader, + accumulator: _EnvelopeAccumulator, + little_endian: bool, + has_z: bool, + has_m: bool, +) -> None: + num_rings = reader.read_uint32(little_endian) + for _ in range(num_rings): + _parse_points(reader, accumulator, little_endian, has_z, has_m) + + +def _parse_points( + reader: _WKBReader, + accumulator: _EnvelopeAccumulator, + little_endian: bool, + has_z: bool, + has_m: bool, +) -> None: + count = reader.read_uint32(little_endian) + accumulator.start_sequence() + for _ in range(count): + x = reader.read_double(little_endian) + y = reader.read_double(little_endian) + if has_z and has_m: + z = reader.read_double(little_endian) + m = reader.read_double(little_endian) + elif has_z: + z = reader.read_double(little_endian) + m = None + elif has_m: + z = None + m = reader.read_double(little_endian) + else: + z = None + m = None + accumulator.add_point(x=x, y=y, z=z, m=m) + + +def _parse_point( + reader: _WKBReader, + accumulator: _EnvelopeAccumulator, + little_endian: bool, + has_z: bool, + has_m: bool, +) -> None: + accumulator.start_sequence() + x = reader.read_double(little_endian) + y = reader.read_double(little_endian) + + if has_z and has_m: + z = reader.read_double(little_endian) + m = reader.read_double(little_endian) + elif has_z: + z = reader.read_double(little_endian) + m = None + elif has_m: + z = None + m = reader.read_double(little_endian) + else: + z = None + m = None + + accumulator.add_point(x=x, y=y, z=z, m=m) + + +def _normalize_longitude(value: float) -> float: + normalized = ((value + 180.0) % 360.0) - 180.0 + if math.isclose(normalized, -180.0) and value > 0: + return 180.0 + return normalized + + +def _to_circle(value: float) -> float: + if math.isclose(value, 180.0): + return 360.0 + return value + 180.0 + + +def _from_circle(value: float) -> float: + if math.isclose(value, 360.0): + return 180.0 + return value - 180.0 + + +def _edge_minor_arc(start_circle: float, end_circle: float) -> tuple[float, float]: + # The longitude span of a geography edge is the SHORTER arc between its two + # endpoints. A geodesic edge whose endpoints differ by < 180 deg of longitude + # stays within that minor arc; at exactly 180 deg the two arcs are equal and + # either choice is sound. Returned as a (possibly wrapping) circle-space arc. + lo, hi = (start_circle, end_circle) if start_circle <= end_circle else (end_circle, start_circle) + direct = hi - lo + if direct <= 180.0: + return lo, hi + # The shorter way wraps across the 0/360 seam (i.e. across the antimeridian). + return hi, lo + 360.0 + + +def _longitude_interval_from_segments(segments: list[tuple[float, float]]) -> tuple[float, float]: + # Converting longitude bounds to/from circle coordinates can introduce tiny + # floating-point drift at the reconstructed interval edges. Pruning callers + # must use geospatial_pruning.bbox_might_match, which applies conservative + # boundary tolerance instead of exact edge comparisons. + normalized: list[tuple[float, float]] = [] + for start, end in segments: + if end <= 360.0: + normalized.append((start, end)) + else: + # Wrapping arc: split at the 0/360 seam into two non-wrapping segments. + normalized.append((start, 360.0)) + normalized.append((0.0, end - 360.0)) + + merged = _merge_segments(normalized) + if not merged: + raise ValueError("Cannot build longitude interval from empty segments") + + largest_gap = -1.0 + gap_start = merged[0][1] + gap_end = merged[0][0] + 360.0 + for idx in range(len(merged)): + current_end = merged[idx][1] + next_start = merged[(idx + 1) % len(merged)][0] + (360.0 if idx == len(merged) - 1 else 0.0) + gap = next_start - current_end + if gap > largest_gap: + largest_gap = gap + gap_start = current_end + gap_end = next_start + + if largest_gap <= 1e-12: + return -180.0, 180.0 + + start = gap_end % 360.0 + end = gap_start % 360.0 + return _from_circle(start), _from_circle(end) + + +def _merge_longitude_intervals(left_min: float, left_max: float, right_min: float, right_max: float) -> tuple[float, float]: + segments = _interval_to_segments(left_min, left_max) + _interval_to_segments(right_min, right_max) + merged = _merge_segments(segments) + if not merged: + raise ValueError("Cannot merge empty longitude intervals") + + largest_gap = -1.0 + gap_start = 0.0 + gap_end = 0.0 + for idx in range(len(merged)): + current_end = merged[idx][1] + next_start = merged[(idx + 1) % len(merged)][0] + (360.0 if idx == len(merged) - 1 else 0.0) + gap = next_start - current_end + if gap > largest_gap: + largest_gap = gap + gap_start = current_end + gap_end = next_start + + if largest_gap <= 1e-12: + return -180.0, 180.0 + + start = gap_end % 360.0 + end = gap_start % 360.0 + return _from_circle(start), _from_circle(end) + + +def _interval_to_segments(x_min: float, x_max: float) -> list[tuple[float, float]]: + start = _to_circle(_normalize_longitude(x_min)) + end = _to_circle(_normalize_longitude(x_max)) + + if x_min <= x_max: + return [(start, end)] + return [(start, 360.0), (0.0, end)] + + +def _merge_segments(segments: list[tuple[float, float]]) -> list[tuple[float, float]]: + if not segments: + return [] + + ordered = sorted(segments, key=lambda pair: pair[0]) + merged: list[tuple[float, float]] = [ordered[0]] + for start, end in ordered[1:]: + previous_start, previous_end = merged[-1] + if start <= previous_end: + merged[-1] = (previous_start, max(previous_end, end)) + else: + merged.append((start, end)) + return merged diff --git a/pyiceberg/utils/geospatial_pruning.py b/pyiceberg/utils/geospatial_pruning.py new file mode 100644 index 0000000000..cabb89199d --- /dev/null +++ b/pyiceberg/utils/geospatial_pruning.py @@ -0,0 +1,139 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Geospatial bounding-box pruning helpers. + +Returns False ONLY when no row in the file can possibly match; any uncertainty +returns True. False negatives (wrongly returning False) would cause silent data loss. +""" + +from __future__ import annotations + +import math + +from pyiceberg.utils.geospatial import deserialize_geospatial_bound, extract_envelope_from_wkb + +_SUPPORTED_PREDICATES = {"st-contains", "st-intersects", "st-overlaps", "st-within"} +_BOUNDARY_ABSOLUTE_EPSILON = 1e-7 +_BOUNDARY_RELATIVE_EPSILON = 1e-12 + + +def bbox_might_match( + predicate: str, + query_wkb: bytes, + lower_bound: bytes | None, + upper_bound: bytes | None, + is_geography: bool, +) -> bool: + if predicate not in _SUPPORTED_PREDICATES: + raise ValueError(f"Unsupported geospatial predicate for bbox pruning: {predicate}") + + if lower_bound is None or upper_bound is None: + return True + + query_envelope = extract_envelope_from_wkb(query_wkb, is_geography) + if query_envelope is None: + return True + + lower = deserialize_geospatial_bound(lower_bound) + upper = deserialize_geospatial_bound(upper_bound) + + # A non-finite (NaN/inf) stored bound is malformed/unknown. NaN comparisons are + # always false, which would silently prune to False, so fall back to "might match". + if not all(math.isfinite(value) for value in (lower.x, lower.y, upper.x, upper.y)): + return True + + y_overlaps = _scalar_intervals_overlap(lower.y, upper.y, query_envelope.y_min, query_envelope.y_max) + if not y_overlaps: + return False + + if is_geography: + x_overlaps = _longitude_intervals_overlap(lower.x, upper.x, query_envelope.x_min, query_envelope.x_max) + else: + x_overlaps = _scalar_intervals_overlap(lower.x, upper.x, query_envelope.x_min, query_envelope.x_max) + + return x_overlaps + + +def _scalar_intervals_overlap(left_min: float, left_max: float, right_min: float, right_max: float) -> bool: + left_start = min(left_min, left_max) + left_end = max(left_min, left_max) + right_start = min(right_min, right_max) + right_end = max(right_min, right_max) + + # BBox pruning must be conservative: returning False can drop matching rows. + # Geography bounds round-trip longitude through circle coordinates, which can + # drift stored edges inward by tiny double-precision amounts. Expand only the + # boundary comparisons so edge-equal queries remain "might match". + epsilon = _interval_boundary_epsilon(left_start, left_end, right_start, right_end) + return left_start <= right_end + epsilon and right_start <= left_end + epsilon + + +def _interval_boundary_epsilon(*values: float) -> float: + finite_values = (abs(value) for value in values if math.isfinite(value)) + scale = max(finite_values, default=1.0) + return _BOUNDARY_ABSOLUTE_EPSILON + (_BOUNDARY_RELATIVE_EPSILON * scale) + + +def _longitude_intervals_overlap(left_min: float, left_max: float, right_min: float, right_max: float) -> bool: + left_segments = _longitude_interval_to_segments(left_min, left_max) + right_segments = _longitude_interval_to_segments(right_min, right_max) + + return any( + _longitude_segments_overlap(left_segment, right_segment) + for left_segment in left_segments + for right_segment in right_segments + ) + + +def _longitude_segments_overlap(left_segment: tuple[float, float], right_segment: tuple[float, float]) -> bool: + left_start, left_end = left_segment + right_start, right_end = right_segment + + return any( + _scalar_intervals_overlap(left_start, left_end, right_start + shift, right_end + shift) for shift in (-360.0, 0.0, 360.0) + ) + + +def _longitude_interval_to_segments(x_min: float, x_max: float) -> list[tuple[float, float]]: + start = _longitude_to_circle(x_min) + end = _longitude_to_circle(x_max) + + if _is_full_longitude_interval(x_min, x_max): + return [(0.0, 360.0)] + + if x_min <= x_max: + return [(start, end)] + + return [(start, 360.0), (0.0, end)] + + +def _is_full_longitude_interval(x_min: float, x_max: float) -> bool: + return math.isclose(_normalize_longitude(x_min), -180.0) and math.isclose(_normalize_longitude(x_max), 180.0) + + +def _normalize_longitude(value: float) -> float: + normalized = ((value + 180.0) % 360.0) - 180.0 + if math.isclose(normalized, -180.0) and value > 0: + return 180.0 + return normalized + + +def _longitude_to_circle(value: float) -> float: + normalized = _normalize_longitude(value) + if math.isclose(normalized, 180.0): + return 360.0 + return normalized + 180.0 diff --git a/tests/utils/test_geospatial.py b/tests/utils/test_geospatial.py new file mode 100644 index 0000000000..2caecc447c --- /dev/null +++ b/tests/utils/test_geospatial.py @@ -0,0 +1,161 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import math +import struct + +import pytest + +from pyiceberg.utils.geospatial import ( + GeospatialBound, + deserialize_geospatial_bound, + extract_envelope_from_wkb, + merge_envelopes, + serialize_geospatial_bound, +) + + +def test_geospatial_bound_serde_xy() -> None: + raw = serialize_geospatial_bound(GeospatialBound(x=10.0, y=20.0)) + assert len(raw) == 16 + bound = deserialize_geospatial_bound(raw) + assert bound.x == 10.0 + assert bound.y == 20.0 + assert bound.z is None + assert bound.m is None + + +def test_geospatial_bound_serde_xyz() -> None: + raw = serialize_geospatial_bound(GeospatialBound(x=10.0, y=20.0, z=30.0)) + assert len(raw) == 24 + bound = deserialize_geospatial_bound(raw) + assert bound.x == 10.0 + assert bound.y == 20.0 + assert bound.z == 30.0 + assert bound.m is None + + +def test_geospatial_bound_serde_xym() -> None: + raw = serialize_geospatial_bound(GeospatialBound(x=10.0, y=20.0, m=40.0)) + assert len(raw) == 32 + x, y, z, m = struct.unpack(" None: + raw = serialize_geospatial_bound(GeospatialBound(x=10.0, y=20.0, z=30.0, m=40.0)) + assert len(raw) == 32 + bound = deserialize_geospatial_bound(raw) + assert bound.x == 10.0 + assert bound.y == 20.0 + assert bound.z == 30.0 + assert bound.m == 40.0 + + +def test_geospatial_bound_serde_rejects_ambiguous_nan_z_with_m() -> None: + with pytest.raises(ValueError, match="NaN z"): + serialize_geospatial_bound(GeospatialBound(x=1.0, y=2.0, z=math.nan, m=5.0)) + + xym = GeospatialBound(x=1.0, y=2.0, z=None, m=5.0) + assert deserialize_geospatial_bound(serialize_geospatial_bound(xym)) == xym + + xyzm = GeospatialBound(x=1.0, y=2.0, z=3.0, m=5.0) + assert deserialize_geospatial_bound(serialize_geospatial_bound(xyzm)) == xyzm + + +def test_extract_envelope_geometry() -> None: + # LINESTRING(170 0, -170 1) + wkb = struct.pack(" None: + # LINESTRING(170 0, -170 1) + wkb = struct.pack(" envelope.x_max + assert envelope.x_min == 170.0 + assert envelope.x_max == -170.0 + assert envelope.y_min == 0.0 + assert envelope.y_max == 1.0 + + +def test_extract_envelope_xyzm_linestring() -> None: + # LINESTRING ZM (0 1 2 3, 4 5 6 7) + wkb = struct.pack(" None: + # LINESTRING(-120 0, 0 0, 120 0): the connected edges occupy the -120..120 arc + # through 0. A vertex-set minimal-arc heuristic would instead wrap and exclude a + # real edge (e.g. the segment containing longitude -60), dropping matching rows. + wkb = struct.pack(" None: + # MULTIPOINT(-120 0, 0 0, 120 0): no edges connect the points, so the minimal + # enclosing arc is the short way (wrapping the antimeridian) since the largest + # gap among the three points is the -120..120 span through 0. + point = lambda x, y: struct.pack(" envelope.x_max # wraps: 120 .. -120 across +-180 + + +def test_merge_geography_envelopes() -> None: + left = extract_envelope_from_wkb(struct.pack(" merged.x_max + assert merged.x_min == 170.0 + assert merged.x_max == -120.0 + assert merged.y_min == 0.0 + assert merged.y_max == 3.0 diff --git a/tests/utils/test_geospatial_pruning.py b/tests/utils/test_geospatial_pruning.py new file mode 100644 index 0000000000..70389ff387 --- /dev/null +++ b/tests/utils/test_geospatial_pruning.py @@ -0,0 +1,385 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import struct +from random import Random + +import pytest + +from pyiceberg.utils.geospatial import ( + GeospatialBound, + GeospatialStatsAggregator, + extract_envelope_from_wkb, + serialize_geospatial_bound, +) +from pyiceberg.utils.geospatial_pruning import bbox_might_match + +_SUPPORTED_PREDICATES = ("st-contains", "st-intersects", "st-overlaps", "st-within") + + +def _point_wkb(x: float, y: float) -> bytes: + return struct.pack(" bytes: + return struct.pack(">BIdd", 0, 1, x, y) + + +def _point_xyzm_wkb(x: float, y: float, z: float, m: float) -> bytes: + return struct.pack(" bytes: + ordinates = [ordinate for point in points for ordinate in point] + return struct.pack(" bytes: + points = [ + (x_min, y_min), + (x_max, y_min), + (x_max, y_max), + (x_min, y_max), + (x_min, y_min), + ] + ordinates = [ordinate for point in points for ordinate in point] + return struct.pack(" bytes: + return serialize_geospatial_bound(GeospatialBound(x=x, y=y, z=z, m=m)) + + +def _bounds(x_min: float, y_min: float, x_max: float, y_max: float) -> tuple[bytes, bytes]: + return _bound(x_min, y_min), _bound(x_max, y_max) + + +def _stats_bounds(points: list[tuple[float, float]], is_geography: bool) -> tuple[bytes, bytes]: + aggregator = GeospatialStatsAggregator(is_geography=is_geography) + for x, y in points: + aggregator.add(_point_wkb(x, y)) + + lower_bound = aggregator.serialized_min() + upper_bound = aggregator.serialized_max() + assert lower_bound is not None + assert upper_bound is not None + return lower_bound, upper_bound + + +@pytest.mark.parametrize("predicate", ["st-intersects", "st-overlaps", "st-contains"]) +def test_bbox_pruning_disjoint_geometry_returns_false(predicate: str) -> None: + lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0) + query_wkb = _polygon_bbox_wkb(20.0, 20.0, 30.0, 30.0) + + assert not bbox_might_match(predicate, query_wkb, lower_bound, upper_bound, is_geography=False) + + +def test_bbox_pruning_overlapping_geometry_returns_true() -> None: + lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0) + query_wkb = _polygon_bbox_wkb(5.0, 5.0, 15.0, 15.0) + + assert bbox_might_match("st-intersects", query_wkb, lower_bound, upper_bound, is_geography=False) + + +@pytest.mark.parametrize(("lower_bound", "upper_bound"), [(None, _bound(10.0, 10.0)), (_bound(0.0, 0.0), None)]) +def test_bbox_pruning_missing_bounds_returns_true(lower_bound: bytes | None, upper_bound: bytes | None) -> None: + assert bbox_might_match("st-intersects", _point_wkb(100.0, 100.0), lower_bound, upper_bound, is_geography=False) + + +def test_bbox_pruning_within_disjoint_returns_false() -> None: + lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0) + query_wkb = _polygon_bbox_wkb(20.0, 20.0, 30.0, 30.0) + + assert not bbox_might_match("st-within", query_wkb, lower_bound, upper_bound, is_geography=False) + + +def test_bbox_pruning_within_overlapping_returns_true() -> None: + lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0) + query_wkb = _polygon_bbox_wkb(5.0, 5.0, 15.0, 15.0) + + assert bbox_might_match("st-within", query_wkb, lower_bound, upper_bound, is_geography=False) + + +def test_bbox_pruning_antimeridian_wrapped_file_matches_query_inside_interval() -> None: + lower_bound, upper_bound = _bounds(170.0, -10.0, -170.0, 10.0) + + assert bbox_might_match("st-intersects", _point_wkb(178.0, 0.0), lower_bound, upper_bound, is_geography=True) + + +def test_bbox_pruning_antimeridian_wrapped_file_prunes_query_outside_interval() -> None: + lower_bound, upper_bound = _bounds(170.0, -10.0, -170.0, 10.0) + + assert not bbox_might_match("st-intersects", _point_wkb(0.0, 0.0), lower_bound, upper_bound, is_geography=True) + + +@pytest.mark.parametrize( + ("file_x_min", "file_x_max", "query_x"), + [ + (170.0, 180.0, 180.0), + (170.0, 180.0, -180.0), + (180.0, 180.0, 180.0), + (180.0, 180.0, -180.0), + (-180.0, -180.0, 180.0), + (-180.0, -180.0, -180.0), + ], +) +def test_bbox_pruning_geography_antimeridian_seam_is_closed( + file_x_min: float, + file_x_max: float, + query_x: float, +) -> None: + lower_bound, upper_bound = _bounds(file_x_min, 0.0, file_x_max, 10.0) + + assert bbox_might_match("st-intersects", _point_wkb(query_x, 5.0), lower_bound, upper_bound, is_geography=True) + + +@pytest.mark.parametrize( + ("file_x_min", "file_y_min", "file_x_max", "file_y_max", "query_x", "query_y"), + [ + (170.0, 0.0, 180.0, 10.0, 0.0, 5.0), + (170.0, 0.0, 180.0, 10.0, -90.0, 5.0), + (180.0, 0.0, 180.0, 10.0, 179.0, 5.0), + (-180.0, 0.0, -180.0, 10.0, -179.0, 5.0), + (170.0, 0.0, -170.0, 10.0, 0.0, 5.0), + ], +) +def test_bbox_pruning_geography_negative_mutation_guard( + file_x_min: float, + file_y_min: float, + file_x_max: float, + file_y_max: float, + query_x: float, + query_y: float, +) -> None: + # Mutation guard: keep at least five False assertions so a return-True stub fails loudly. + lower_bound, upper_bound = _bounds(file_x_min, file_y_min, file_x_max, file_y_max) + + assert not bbox_might_match("st-intersects", _point_wkb(query_x, query_y), lower_bound, upper_bound, is_geography=True) + + +def test_bbox_pruning_big_endian_point_wkb_extracts_and_prunes() -> None: + query_wkb = _big_endian_point_wkb(5.0, 6.0) + envelope = extract_envelope_from_wkb(query_wkb, is_geography=False) + + assert envelope is not None + assert envelope.x_min == 5.0 + assert envelope.x_max == 5.0 + assert envelope.y_min == 6.0 + assert envelope.y_max == 6.0 + + matching_lower_bound, matching_upper_bound = _bounds(0.0, 0.0, 10.0, 10.0) + disjoint_lower_bound, disjoint_upper_bound = _bounds(20.0, 0.0, 30.0, 10.0) + + assert bbox_might_match( + "st-intersects", + query_wkb, + matching_lower_bound, + matching_upper_bound, + is_geography=False, + ) + assert not bbox_might_match( + "st-intersects", + query_wkb, + disjoint_lower_bound, + disjoint_upper_bound, + is_geography=False, + ) + + +def test_bbox_pruning_false_negative_guard_for_points_inside_wrapped_file_bbox() -> None: + lower_bound, upper_bound = _bounds(170.0, -20.0, -160.0, 20.0) + + for longitude, latitude in [ + (170.0, -20.0), + (174.5, 3.0), + (179.0, 19.0), + (180.0, 0.0), + (-179.5, -7.0), + (-170.0, 20.0), + (-160.0, 0.5), + ]: + assert bbox_might_match( + "st-intersects", + _point_wkb(longitude, latitude), + lower_bound, + upper_bound, + is_geography=True, + ) + + +def test_bbox_pruning_ignores_z_and_m_dimensions() -> None: + lower_bound = _bound(0.0, 0.0, z=0.0, m=0.0) + upper_bound = _bound(10.0, 10.0, z=1.0, m=1.0) + query_wkb = _point_xyzm_wkb(5.0, 5.0, 999.0, 999.0) + + assert bbox_might_match("st-intersects", query_wkb, lower_bound, upper_bound, is_geography=False) + + +@pytest.mark.parametrize("is_geography", [False, True]) +@pytest.mark.parametrize( + ("lower", "upper"), + [ + (GeospatialBound(x=float("nan"), y=0.0), GeospatialBound(x=10.0, y=10.0)), + (GeospatialBound(x=0.0, y=float("nan")), GeospatialBound(x=10.0, y=10.0)), + (GeospatialBound(x=0.0, y=0.0), GeospatialBound(x=float("nan"), y=10.0)), + (GeospatialBound(x=0.0, y=0.0), GeospatialBound(x=10.0, y=float("inf"))), + ], +) +def test_bbox_pruning_non_finite_bound_is_treated_as_might_match( + is_geography: bool, lower: GeospatialBound, upper: GeospatialBound +) -> None: + # Safety contract: a malformed/non-finite stored bound must never prune to False. + lower_bound = serialize_geospatial_bound(lower) + upper_bound = serialize_geospatial_bound(upper) + + assert bbox_might_match("st-intersects", _point_wkb(5.0, 5.0), lower_bound, upper_bound, is_geography=is_geography) + + +def test_bbox_pruning_empty_query_returns_true() -> None: + lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0) + query_wkb = _linestring_wkb([]) + + assert bbox_might_match("st-intersects", query_wkb, lower_bound, upper_bound, is_geography=False) + + +def test_bbox_pruning_geography_preserves_file_edge_point_after_circle_round_trip() -> None: + points = [(-9.32, 29.55), (-158.16, 36.27), (52.97, 88.76)] + lower_bound, upper_bound = _stats_bounds(points, is_geography=True) + + assert bbox_might_match( + "st-intersects", + _point_wkb(-158.16, 36.27), + lower_bound, + upper_bound, + is_geography=True, + ) + + +@pytest.mark.parametrize( + ("is_geography", "lon_range", "lat_range"), + [ + (True, (-180.0, 180.0), (-90.0, 90.0)), + (False, (-1_000_000.0, 1_000_000.0), (-1_000_000.0, 1_000_000.0)), + ], +) +def test_bbox_pruning_never_prunes_files_own_points( + is_geography: bool, + lon_range: tuple[float, float], + lat_range: tuple[float, float], +) -> None: + rng = Random(1729 if is_geography else 2718) + false_negatives: list[tuple[list[tuple[float, float]], tuple[float, float], str]] = [] + + for _ in range(2000): + points = [ + ( + rng.uniform(*lon_range), + rng.uniform(*lat_range), + ) + for _ in range(rng.randint(1, 5)) + ] + lower_bound, upper_bound = _stats_bounds(points, is_geography=is_geography) + + for point in points: + query_wkb = _point_wkb(*point) + for predicate in _SUPPORTED_PREDICATES: + if not bbox_might_match( + predicate, + query_wkb, + lower_bound, + upper_bound, + is_geography=is_geography, + ): + false_negatives.append((points, point, predicate)) + + assert false_negatives == [] + + +def _norm_longitude(value: float) -> float: + return ((value + 180.0) % 360.0) - 180.0 + + +def _interpolate_along_minor_arc(start: tuple[float, float], end: tuple[float, float], fraction: float) -> tuple[float, float]: + start_circle = _norm_longitude(start[0]) + 180.0 + end_circle = _norm_longitude(end[0]) + 180.0 + delta = end_circle - start_circle + if delta > 180.0: + delta -= 360.0 + if delta < -180.0: + delta += 360.0 + longitude = _norm_longitude((start_circle + delta * fraction) - 180.0) + latitude = start[1] + (end[1] - start[1]) * fraction + return longitude, latitude + + +def test_bbox_pruning_geography_keeps_point_on_excluded_edge() -> None: + # Regression: LINESTRING(-120 0, 0 0, 120 0) contains POINT(-60 0) on its first + # edge. A vertex-set minimal-arc bound wrongly wrapped and pruned this point. + line = struct.pack(" None: + rng = Random(20240623) + false_negatives: list[tuple[list[tuple[float, float]], tuple[float, float], str]] = [] + + for _ in range(2000): + vertices = [(rng.uniform(-180.0, 180.0), rng.uniform(-85.0, 85.0)) for _ in range(rng.randint(2, 5))] + aggregator = GeospatialStatsAggregator(is_geography=True) + aggregator.add(_linestring_wkb(vertices)) + lower_bound = aggregator.serialized_min() + upper_bound = aggregator.serialized_max() + assert lower_bound is not None + assert upper_bound is not None + + for index in range(len(vertices) - 1): + for fraction in (0.0, 0.25, 0.5, 0.75, 1.0): + sample = _interpolate_along_minor_arc(vertices[index], vertices[index + 1], fraction) + for predicate in _SUPPORTED_PREDICATES: + if not bbox_might_match( + predicate, + _point_wkb(*sample), + lower_bound, + upper_bound, + is_geography=True, + ): + false_negatives.append((vertices, sample, predicate)) + + assert false_negatives == [] + + +def test_bbox_pruning_geography_still_prunes_clearly_disjoint_query() -> None: + lower_bound, upper_bound = _stats_bounds( + [(-1.0, -1.0), (0.0, 0.0), (1.0, 1.0)], + is_geography=True, + ) + + assert not bbox_might_match("st-intersects", _point_wkb(170.0, 80.0), lower_bound, upper_bound, is_geography=True) diff --git a/tests/utils/test_geospatial_stats.py b/tests/utils/test_geospatial_stats.py new file mode 100644 index 0000000000..a8d375e36a --- /dev/null +++ b/tests/utils/test_geospatial_stats.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import struct + +from pyiceberg.utils.geospatial import GeospatialStatsAggregator, deserialize_geospatial_bound + + +def _point_wkb(x: float, y: float) -> bytes: + return struct.pack(" bytes: + ordinates = [ordinate for point in points for ordinate in point] + return struct.pack(" None: + aggregator = GeospatialStatsAggregator(is_geography=False) + + aggregator.add(_point_wkb(10.0, -3.0)) + aggregator.add(_linestring_xyzm_wkb([(-2.0, 4.0, 7.0, 3.0), (6.0, 9.0, 2.0, 8.0)])) + + min_bound = aggregator.min_bound() + max_bound = aggregator.max_bound() + assert min_bound is not None + assert max_bound is not None + assert min_bound.x == -2.0 + assert min_bound.y == -3.0 + assert min_bound.z == 2.0 + assert min_bound.m == 3.0 + assert max_bound.x == 10.0 + assert max_bound.y == 9.0 + assert max_bound.z == 7.0 + assert max_bound.m == 8.0 + + serialized_min = aggregator.serialized_min() + serialized_max = aggregator.serialized_max() + assert serialized_min is not None + assert serialized_max is not None + assert deserialize_geospatial_bound(serialized_min) == aggregator.min_bound() + assert deserialize_geospatial_bound(serialized_max) == aggregator.max_bound() + + +def test_geospatial_stats_aggregator_geography_wraps_antimeridian() -> None: + aggregator = GeospatialStatsAggregator(is_geography=True) + + aggregator.add(_point_wkb(170.0, 1.0)) + aggregator.add(_point_wkb(-170.0, 2.0)) + + min_bound = aggregator.min_bound() + max_bound = aggregator.max_bound() + + assert min_bound is not None + assert max_bound is not None + assert min_bound.x > max_bound.x + assert min_bound.x == 170.0 + assert max_bound.x == -170.0 + assert min_bound.y == 1.0 + assert max_bound.y == 2.0 + + +def test_geospatial_stats_aggregator_empty_input_has_no_bounds() -> None: + aggregator = GeospatialStatsAggregator(is_geography=False) + + assert aggregator.min_bound() is None + assert aggregator.max_bound() is None + assert aggregator.serialized_min() is None + assert aggregator.serialized_max() is None