diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 076098c757..9281c4cc98 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -2613,6 +2613,49 @@ def data_file_statistics_from_parquet_metadata(
)
+def _geospatial_bounds_from_arrow(schema: Schema, arrow_table: pa.Table) -> tuple[dict[int, bytes], dict[int, bytes]]:
+ """Compute v3 geospatial lower/upper bounds for any geometry/geography columns.
+
+ Geometry/geography values are stored as WKB. Parquet column statistics cannot
+ bound WKB blobs spatially, so the bounds are derived by extracting each value's
+ envelope. Returns ``(lower_bounds, upper_bounds)`` keyed by field id, serialized
+ per the spec (little-endian doubles). Columns with no extractable geometry are
+ skipped.
+ """
+ from pyiceberg.types import GeographyType, GeometryType
+ from pyiceberg.utils.geospatial import GeospatialStatsAggregator
+
+ lower: dict[int, bytes] = {}
+ upper: dict[int, bytes] = {}
+
+ for field in schema.as_struct().fields:
+ if not isinstance(field.field_type, (GeometryType, GeographyType)):
+ continue
+ if field.name not in arrow_table.column_names:
+ continue
+
+ is_geography = isinstance(field.field_type, GeographyType)
+ aggregator = GeospatialStatsAggregator(is_geography=is_geography)
+ column = arrow_table.column(field.name)
+ # Geometry/geography columns are WKB. When geoarrow-pyarrow is present the
+ # column is a WKB extension type whose storage holds the raw WKB bytes; unwrap
+ # it so the envelope extractor sees bytes rather than parsed geometry objects.
+ for chunk in column.chunks:
+ storage = chunk.storage if isinstance(chunk.type, pa.ExtensionType) else chunk
+ for value in storage.to_pylist():
+ if value is None:
+ continue
+ aggregator.add(bytes(value))
+
+ serialized_min = aggregator.serialized_min()
+ serialized_max = aggregator.serialized_max()
+ if serialized_min is not None and serialized_max is not None:
+ lower[field.field_id] = serialized_min
+ upper[field.field_id] = serialized_max
+
+ return lower, upper
+
+
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
@@ -2660,6 +2703,16 @@ def write_parquet(task: WriteTask) -> DataFile:
stats_columns=compute_statistics_plan(file_schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
)
+ serialized_stats = statistics.to_serialized_dict()
+
+ # v3 geospatial bounds: parquet column stats cannot bound WKB spatially, so
+ # derive lower/upper bounds from the geometry/geography column envelopes and
+ # merge them into the data file bounds.
+ geo_lower, geo_upper = _geospatial_bounds_from_arrow(file_schema, arrow_table)
+ if geo_lower:
+ serialized_stats["lower_bounds"] = {**serialized_stats.get("lower_bounds", {}), **geo_lower}
+ serialized_stats["upper_bounds"] = {**serialized_stats.get("upper_bounds", {}), **geo_upper}
+
data_file = DataFile.from_args(
content=DataFileContent.DATA,
file_path=file_path,
@@ -2674,7 +2727,7 @@ def write_parquet(task: WriteTask) -> DataFile:
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
- **statistics.to_serialized_dict(),
+ **serialized_stats,
)
return data_file
diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py
index 3811a9d894..9a1d5811e2 100644
--- a/pyiceberg/manifest.py
+++ b/pyiceberg/manifest.py
@@ -54,7 +54,7 @@
UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
-DEFAULT_READ_VERSION: Literal[2] = 2
+DEFAULT_READ_VERSION: Literal[3] = 3
INITIAL_SEQUENCE_NUMBER = 0
@@ -531,6 +531,46 @@ def equality_ids(self) -> list[int] | None:
def sort_order_id(self) -> int | None:
return self._data[15]
+ @property
+ def first_row_id(self) -> int | None:
+ """The _row_id for the first row in the data file (field 142, v3+ only).
+
+ Note: ``DataFile.from_args`` always allocates the full v3-width record
+ (``len == 20``) regardless of table version, so a v1/v2 file simply holds
+ ``None`` at position 16. The length guard below only matters for records
+ constructed directly from a shorter positional list.
+ """
+ if len(self._data) > 16:
+ return self._data[16]
+ return None
+
+ @first_row_id.setter
+ def first_row_id(self, value: int | None) -> None:
+ if len(self._data) <= 16:
+ raise ValueError("Cannot set first_row_id: record has no field-142 slot")
+ self._data[16] = value
+
+ @property
+ def referenced_data_file(self) -> str | None:
+ """Location of the data file that all deletes in this file reference (field 143, v3+)."""
+ if len(self._data) > 17:
+ return self._data[17]
+ return None
+
+ @property
+ def content_offset(self) -> int | None:
+ """Offset in the file where the deletion-vector blob starts (field 144, v3+)."""
+ if len(self._data) > 18:
+ return self._data[18]
+ return None
+
+ @property
+ def content_size_in_bytes(self) -> int | None:
+ """Length of the referenced deletion-vector blob (field 145, v3+)."""
+ if len(self._data) > 19:
+ return self._data[19]
+ return None
+
# Spec ID should not be stored in the file
_spec_id: int
@@ -549,6 +589,23 @@ def __setattr__(self, name: str, value: Any) -> None:
value = FileFormat[value]
super().__setattr__(name, value)
+ def __copy__(self) -> DataFile:
+ """Return a copy whose ``_data`` list is independent from the original.
+
+ ``Record`` stores its fields in a mutable ``_data`` list, so the default
+ ``copy.copy`` would share that list and a mutation on the copy (e.g.
+ assigning ``first_row_id`` while materializing row lineage on a delete)
+ would leak back into the source DataFile. Copying the list isolates
+ field-level mutations to the copy while preserving ``_spec_id``.
+ """
+ cls = self.__class__
+ new = cls.__new__(cls)
+ new._data = list(self._data)
+ instance_dict = getattr(self, "__dict__", None)
+ if instance_dict:
+ new.__dict__.update(instance_dict)
+ return new
+
def __hash__(self) -> int:
"""Return the hash of the file path."""
return hash(self.file_path)
@@ -852,6 +909,17 @@ def partitions(self) -> list[PartitionFieldSummary] | None:
def key_metadata(self) -> bytes | None:
return self._data[14]
+ @property
+ def first_row_id(self) -> int | None:
+ return self._data[15] if len(self._data) > 15 else None
+
+ @first_row_id.setter
+ def first_row_id(self, value: int | None) -> None:
+ if len(self._data) <= 15:
+ self._data.append(value)
+ else:
+ self._data[15] = value
+
def has_added_files(self) -> bool:
return self.added_files_count is None or self.added_files_count > 0
@@ -1014,6 +1082,7 @@ class ManifestWriter(ABC):
_min_sequence_number: int | None
_partitions: list[Record]
_compression: AvroCompressionCodec
+ _content: ManifestContent
def __init__(
self,
@@ -1022,12 +1091,14 @@ def __init__(
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
+ content: ManifestContent = ManifestContent.DATA,
) -> None:
self.closed = False
self._spec = spec
self._schema = schema
self._output_file = output_file
self._snapshot_id = snapshot_id
+ self._content = content
self._added_files = 0
self._added_rows = 0
@@ -1192,8 +1263,11 @@ def __init__(
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
+ content: ManifestContent = ManifestContent.DATA,
):
- super().__init__(spec, schema, output_file, snapshot_id, avro_compression)
+ if content != ManifestContent.DATA:
+ raise ValueError("Cannot write delete manifests in a v1 table")
+ super().__init__(spec, schema, output_file, snapshot_id, avro_compression, content)
def content(self) -> ManifestContent:
return ManifestContent.DATA
@@ -1214,11 +1288,12 @@ def __init__(
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
+ content: ManifestContent = ManifestContent.DATA,
):
- super().__init__(spec, schema, output_file, snapshot_id, avro_compression)
+ super().__init__(spec, schema, output_file, snapshot_id, avro_compression, content)
def content(self) -> ManifestContent:
- return ManifestContent.DATA
+ return self._content
@property
def version(self) -> TableVersion:
@@ -1228,7 +1303,7 @@ def version(self) -> TableVersion:
def _meta(self) -> dict[str, str]:
return {
**super()._meta,
- "content": "data",
+ "content": "deletes" if self._content == ManifestContent.DELETES else "data",
}
def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
@@ -1240,6 +1315,12 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
return entry
+class ManifestWriterV3(ManifestWriterV2):
+ @property
+ def version(self) -> TableVersion:
+ return 3
+
+
def write_manifest(
format_version: TableVersion,
spec: PartitionSpec,
@@ -1247,11 +1328,14 @@ def write_manifest(
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
+ content: ManifestContent = ManifestContent.DATA,
) -> ManifestWriter:
if format_version == 1:
- return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
+ return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression, content)
elif format_version == 2:
- return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
+ return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression, content)
+ elif format_version == 3:
+ return ManifestWriterV3(spec, schema, output_file, snapshot_id, avro_compression, content)
else:
raise ValueError(f"Cannot write manifest for table version: {format_version}")
@@ -1295,6 +1379,10 @@ def __exit__(
@abstractmethod
def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ...
+ @property
+ def next_row_id(self) -> int | None:
+ return None
+
def add_manifests(self, manifest_files: list[ManifestFile]) -> ManifestListWriter:
self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files])
return self
@@ -1351,9 +1439,7 @@ def __init__(
self._commit_snapshot_id = snapshot_id
self._sequence_number = sequence_number
- def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
- wrapped_manifest_file = copy(manifest_file)
-
+ def _prepare_manifest_for_commit(self, wrapped_manifest_file: ManifestFile) -> ManifestFile:
if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ:
# if the sequence number is being assigned here, then the manifest must be created by the current operation.
# To validate this, check that the snapshot id matches the current commit
@@ -1374,6 +1460,56 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
wrapped_manifest_file.min_sequence_number = self._sequence_number
return wrapped_manifest_file
+ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
+ return self._prepare_manifest_for_commit(copy(manifest_file))
+
+
+class ManifestListWriterV3(ManifestListWriterV2):
+ _next_row_id: int
+
+ def __init__(
+ self,
+ output_file: OutputFile,
+ snapshot_id: int,
+ parent_snapshot_id: int | None,
+ sequence_number: int,
+ snapshot_first_row_id: int,
+ compression: AvroCompressionCodec,
+ ):
+ super().__init__(
+ output_file=output_file,
+ snapshot_id=snapshot_id,
+ parent_snapshot_id=parent_snapshot_id,
+ sequence_number=sequence_number,
+ compression=compression,
+ )
+ self._format_version = 3
+ self._meta = {
+ "snapshot-id": str(snapshot_id),
+ "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
+ "sequence-number": str(sequence_number),
+ "first-row-id": str(snapshot_first_row_id),
+ "format-version": "3",
+ AVRO_CODEC_KEY: compression,
+ }
+ self._next_row_id = snapshot_first_row_id
+
+ @property
+ def next_row_id(self) -> int | None:
+ return self._next_row_id
+
+ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
+ wrapped = self._prepare_manifest_for_commit(copy(manifest_file))
+ if wrapped.content == ManifestContent.DATA and wrapped.first_row_id is None:
+ if wrapped.existing_rows_count is None or wrapped.added_rows_count is None:
+ raise ValueError(
+ "Cannot assign first row id for a v3 manifest without existing-rows-count and added-rows-count: "
+ + wrapped.manifest_path
+ )
+ wrapped.first_row_id = self._next_row_id
+ self._next_row_id += wrapped.existing_rows_count + wrapped.added_rows_count
+ return wrapped
+
def write_manifest_list(
format_version: TableVersion,
@@ -1382,6 +1518,7 @@ def write_manifest_list(
parent_snapshot_id: int | None,
sequence_number: int | None,
avro_compression: AvroCompressionCodec,
+ snapshot_first_row_id: int | None = None,
) -> ManifestListWriter:
if format_version == 1:
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression)
@@ -1389,5 +1526,18 @@ def write_manifest_list(
if sequence_number is None:
raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}")
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression)
+ elif format_version == 3:
+ if sequence_number is None:
+ raise ValueError(f"Sequence-number is required for V3 tables: {sequence_number}")
+ if snapshot_first_row_id is None:
+ raise ValueError(f"Snapshot-first-row-id is required for V3 tables: {snapshot_first_row_id}")
+ return ManifestListWriterV3(
+ output_file=output_file,
+ snapshot_id=snapshot_id,
+ parent_snapshot_id=parent_snapshot_id,
+ sequence_number=sequence_number,
+ snapshot_first_row_id=snapshot_first_row_id,
+ compression=avro_compression,
+ )
else:
raise ValueError(f"Cannot write manifest list for table version: {format_version}")
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 597f62632f..b086111fdc 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -48,7 +48,7 @@
from pyiceberg.table.inspect import InspectTable
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.table.maintenance import MaintenanceTable
-from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata
+from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry
@@ -293,7 +293,7 @@ def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
Returns:
The alter table builder.
"""
- if format_version not in {1, 2}:
+ if not 1 <= format_version <= SUPPORTED_TABLE_FORMAT_VERSION:
raise ValueError(f"Unsupported table format version: {format_version}")
if format_version < self.table_metadata.format_version:
@@ -730,6 +730,23 @@ def delete(
# Check if there are any files that require an actual rewrite of a data file
if delete_snapshot.rewrites_needed is True:
+ if self.table_metadata.format_version >= 3:
+ # A partial (copy-on-write) delete physically rewrites surviving rows into a
+ # NEW data file. Per the v3 spec those rows must KEEP their original _row_id
+ # (field 142 / the _row_id metadata column). After a partial delete the
+ # survivors are generally non-contiguous (e.g. 0,1,4,5), so their lineage can
+ # only be preserved by materializing an explicit per-row _row_id column on read
+ # and persisting it on rewrite. PyIceberg has no read-side _row_id
+ # materialization yet, so doing this rewrite would silently RE-NUMBER the
+ # surviving rows and corrupt row lineage. We fail loudly instead.
+ raise NotImplementedError(
+ "v3 copy-on-write delete that requires rewriting a data file is not "
+ "supported yet: surviving rows would lose their _row_id lineage because "
+ "PyIceberg cannot materialize/preserve per-row _row_id (field 142) across a "
+ "physical rewrite. Whole-file deletes (which drop entire data files and "
+ "preserve survivors' row ids) are supported. Track: read-side _row_id "
+ "materialization."
+ )
bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive)
preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter, self.table_metadata.schema())
diff --git a/pyiceberg/table/delete_file_index.py b/pyiceberg/table/delete_file_index.py
index 3f513aabe5..c24a9327cf 100644
--- a/pyiceberg/table/delete_file_index.py
+++ b/pyiceberg/table/delete_file_index.py
@@ -78,6 +78,12 @@ def _applies_to_data_file(delete_file: DataFile, data_file: DataFile) -> bool:
def _referenced_data_file_path(delete_file: DataFile) -> str | None:
"""Return the path, if the path bounds evaluate to the same location."""
+ # Deletion vectors (and any delete file that targets a single data file) carry the
+ # referenced data file path explicitly in field 143. Prefer it when present.
+ referenced = delete_file.referenced_data_file
+ if referenced is not None:
+ return referenced
+
lower_bounds = delete_file.lower_bounds
upper_bounds = delete_file.upper_bounds
diff --git a/pyiceberg/table/encryption.py b/pyiceberg/table/encryption.py
new file mode 100644
index 0000000000..5fece598ca
--- /dev/null
+++ b/pyiceberg/table/encryption.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Encryption metadata plumbing only.
+
+This module models encrypted table metadata for faithful JSON round-tripping.
+It does not implement cryptography, KMS integration, or key wrapping.
+"""
+
+from __future__ import annotations
+
+from pydantic import Field
+
+from pyiceberg.typedef import IcebergBaseModel
+
+
+class EncryptedKey(IcebergBaseModel):
+ key_id: str = Field(alias="key-id")
+ encrypted_key_metadata: str = Field(alias="encrypted-key-metadata")
+ encrypted_by_id: str | None = Field(alias="encrypted-by-id", default=None)
+ properties: dict[str, str] | None = Field(default=None)
diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py
index 26b6e3d3ad..64264fe892 100644
--- a/pyiceberg/table/metadata.py
+++ b/pyiceberg/table/metadata.py
@@ -28,6 +28,7 @@
from pyiceberg.exceptions import ValidationError
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec, assign_fresh_partition_spec_ids
from pyiceberg.schema import Schema, assign_fresh_schema_ids
+from pyiceberg.table.encryption import EncryptedKey
from pyiceberg.table.name_mapping import NameMapping, parse_mapping_from_json
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
from pyiceberg.table.snapshots import MetadataLogEntry, Snapshot, SnapshotLogEntry
@@ -66,7 +67,7 @@
INITIAL_SPEC_ID = 0
DEFAULT_SCHEMA_ID = 0
-SUPPORTED_TABLE_FORMAT_VERSION = 2
+SUPPORTED_TABLE_FORMAT_VERSION = 3
def cleanup_snapshot_id(data: dict[str, Any]) -> dict[str, Any]:
@@ -574,8 +575,8 @@ def construct_refs(self) -> TableMetadata:
next_row_id: int | None = Field(alias="next-row-id", default=None)
"""A long higher than all assigned row IDs; the next snapshot's `first-row-id`."""
- def model_dump_json(self, exclude_none: bool = True, exclude: Any | None = None, by_alias: bool = True, **kwargs: Any) -> str:
- raise NotImplementedError("Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551")
+ encryption_keys: list[EncryptedKey] | None = Field(alias="encryption-keys", default=None)
+ """A list of encrypted keys used by this table."""
TableMetadata = Annotated[TableMetadataV1 | TableMetadataV2 | TableMetadataV3, Field(discriminator="format_version")]
@@ -645,6 +646,7 @@ def new_table_metadata(
properties=properties,
last_partition_id=fresh_partition_spec.last_assigned_field_id,
table_uuid=table_uuid,
+ next_row_id=0,
)
else:
raise ValidationError(f"Unknown format version: {format_version}")
diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py
index 917d387f45..12da692809 100644
--- a/pyiceberg/table/puffin.py
+++ b/pyiceberg/table/puffin.py
@@ -14,22 +14,34 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import io
import math
+import zlib
+from collections.abc import Iterable
from typing import TYPE_CHECKING, Literal
from pydantic import Field
from pyroaring import BitMap, FrozenBitMap
+from pyiceberg import __version__
+from pyiceberg.io import OutputFile
from pyiceberg.typedef import IcebergBaseModel
if TYPE_CHECKING:
import pyarrow as pa
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+
# Short for: Puffin Fratercula arctica, version 1
MAGIC_BYTES = b"PFA1"
+DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64"
EMPTY_BITMAP = FrozenBitMap()
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"
+# Reserved field id of the row position (_pos) metadata column, referenced by
+# deletion-vector-v1 blob metadata (Java: MetadataColumns.ROW_POSITION)
+ROW_POSITION_FIELD_ID = 2147483645
def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
@@ -62,6 +74,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
return bitmaps
+def _serialize_bitmaps(bitmaps: dict[int, BitMap]) -> bytes:
+ """
+ Serialize a dictionary of bitmaps into a byte array.
+
+ The format is:
+ - 8 bytes: number of bitmaps (little-endian)
+ - For each bitmap:
+ - 4 bytes: key (little-endian)
+ - n bytes: serialized bitmap
+ """
+ with io.BytesIO() as out:
+ sorted_keys = sorted(bitmaps.keys())
+
+ # number of bitmaps
+ out.write(len(sorted_keys).to_bytes(8, "little"))
+
+ for key in sorted_keys:
+ if key < 0:
+ raise ValueError(f"Invalid unsigned key: {key}")
+ if key > MAX_JAVA_SIGNED:
+ raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl")
+
+ # key
+ out.write(key.to_bytes(4, "little"))
+ # bitmap
+ out.write(bitmaps[key].serialize())
+ return out.getvalue()
+
+
class PuffinBlobMetadata(IcebergBaseModel):
type: Literal["deletion-vector-v1"] = Field()
fields: list[int] = Field()
@@ -81,7 +122,10 @@ class Footer(IcebergBaseModel):
def _bitmaps_to_chunked_array(bitmaps: list[BitMap]) -> "pa.ChunkedArray":
import pyarrow as pa
- return pa.chunked_array([(key_pos << 32) + pos for pos in bitmap] for key_pos, bitmap in enumerate(bitmaps))
+ return pa.chunked_array(
+ ([(key_pos << 32) + pos for pos in bitmap] for key_pos, bitmap in enumerate(bitmaps)),
+ type=pa.int64(),
+ )
class PuffinFile:
@@ -114,3 +158,152 @@ def __init__(self, puffin: bytes) -> None:
def to_vector(self) -> dict[str, "pa.ChunkedArray"]:
return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()}
+
+
+class PuffinWriter:
+ """Writes a Puffin file containing a single deletion-vector-v1 blob to an output file."""
+
+ _output_file: OutputFile
+ _blobs: list[PuffinBlobMetadata]
+ _blob_payloads: list[bytes]
+ _created_by: str
+ written_blobs: list[PuffinBlobMetadata]
+ """Blob metadata (with resolved offset/length) of the blobs written by ``finish()``."""
+
+ def __init__(self, output_file: OutputFile, created_by: str | None = None) -> None:
+ self._output_file = output_file
+ self._blobs = []
+ self._blob_payloads = []
+ self._created_by = created_by if created_by is not None else f"PyIceberg version {__version__}"
+ self.written_blobs = []
+
+ def set_blob(
+ self,
+ positions: Iterable[int],
+ referenced_data_file: str,
+ ) -> None:
+ """Set the deletion vector blob for a data file, replacing any previously set blob.
+
+ Args:
+ positions: Zero-based positions of the deleted rows in the referenced data file.
+ referenced_data_file: Location of the data file the deletion vector applies to.
+ """
+ # We only support one blob at the moment
+ self._blobs = []
+ self._blob_payloads = []
+
+ bitmaps: dict[int, BitMap] = {}
+ for pos in positions:
+ if pos < 0:
+ raise ValueError(f"Invalid position: {pos}, positions must be non-negative")
+ key = pos >> 32
+ low_bits = pos & 0xFFFFFFFF
+ if key not in bitmaps:
+ bitmaps[key] = BitMap()
+ bitmaps[key].add(low_bits)
+
+ if not bitmaps:
+ raise ValueError("Deletion vector must contain at least one position")
+
+ cardinality = sum(len(bm) for bm in bitmaps.values())
+ vector_payload = _serialize_bitmaps(bitmaps)
+
+ # deletion-vector-v1 blob layout: combined length of magic and vector (4 bytes, big-endian),
+ # the DV magic bytes, the serialized vector, and a CRC-32 checksum of magic + vector (4 bytes, big-endian)
+ blob_content = DELETION_VECTOR_MAGIC + vector_payload
+ self._blob_payloads.append(
+ len(blob_content).to_bytes(4, "big") + blob_content + zlib.crc32(blob_content).to_bytes(4, "big")
+ )
+
+ self._blobs.append(
+ PuffinBlobMetadata(
+ type="deletion-vector-v1",
+ fields=[ROW_POSITION_FIELD_ID],
+ # -1 means the snapshot id and sequence number are inherited at commit time
+ snapshot_id=-1,
+ sequence_number=-1,
+ # offset and length are placeholders; finish() fills them in when assembling the file
+ offset=0,
+ length=0,
+ properties={PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)},
+ compression_codec=None,
+ )
+ )
+
+ def finish(self) -> int:
+ """Write the Puffin file to the output file and return its size in bytes."""
+ with io.BytesIO() as out:
+ out.write(MAGIC_BYTES)
+
+ blobs_metadata: list[PuffinBlobMetadata] = []
+ for blob_metadata, blob_payload in zip(self._blobs, self._blob_payloads, strict=True):
+ blobs_metadata.append(blob_metadata.model_copy(update={"offset": out.tell(), "length": len(blob_payload)}))
+ out.write(blob_payload)
+
+ footer = Footer(blobs=blobs_metadata, properties={"created-by": self._created_by})
+ footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8")
+
+ out.write(MAGIC_BYTES)
+ out.write(footer_payload_bytes)
+ out.write(len(footer_payload_bytes).to_bytes(4, "little"))
+ out.write((0).to_bytes(4, "little")) # flags
+ out.write(MAGIC_BYTES)
+
+ puffin_bytes = out.getvalue()
+
+ self.written_blobs = blobs_metadata
+
+ with self._output_file.create(overwrite=True) as output_stream:
+ output_stream.write(puffin_bytes)
+
+ return len(puffin_bytes)
+
+
+def write_deletion_vector(
+ output_file: OutputFile,
+ referenced_data_file: str,
+ positions: Iterable[int],
+ partition: "Record",
+ spec_id: int,
+) -> "DataFile":
+ """Write a single-blob deletion-vector Puffin file and return its manifest entry DataFile.
+
+ The returned ``DataFile`` has ``content == POSITION_DELETES`` and
+ ``file_format == PUFFIN`` (the Iceberg spec models a v3 deletion vector as a
+ position-delete file whose physical format is Puffin; there is no distinct
+ ``DataFileContent`` enum value). It carries ``referenced_data_file`` (field 143),
+ ``content_offset`` (field 144) and ``content_size_in_bytes`` (field 145) so the
+ scan planner can locate and apply the blob.
+
+ Args:
+ output_file: Destination for the ``.puffin`` file.
+ referenced_data_file: Path of the data file the positions refer to.
+ positions: Zero-based row positions to delete in the referenced data file.
+ partition: Partition record of the referenced data file (deletes share its partition).
+ spec_id: Partition spec id of the referenced data file.
+ """
+ from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
+
+ positions = sorted(set(positions))
+
+ writer = PuffinWriter(output_file)
+ writer.set_blob(positions, referenced_data_file)
+ file_size = writer.finish()
+
+ if len(writer.written_blobs) != 1:
+ raise ValueError("Expected exactly one deletion-vector blob to be written")
+ blob = writer.written_blobs[0]
+
+ data_file = DataFile.from_args(
+ content=DataFileContent.POSITION_DELETES,
+ file_path=output_file.location,
+ file_format=FileFormat.PUFFIN,
+ partition=partition,
+ record_count=len(positions),
+ file_size_in_bytes=file_size,
+ referenced_data_file=referenced_data_file,
+ content_offset=blob.offset,
+ content_size_in_bytes=blob.length,
+ )
+ data_file.spec_id = spec_id
+ return data_file
diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py
index 7e4c6eb1ec..ef24a8dd44 100644
--- a/pyiceberg/table/snapshots.py
+++ b/pyiceberg/table/snapshots.py
@@ -252,6 +252,9 @@ class Snapshot(IcebergBaseModel):
added_rows: int | None = Field(
alias="added-rows", default=None, description="The upper bound of the number of rows with assigned row IDs"
)
+ key_id: str | None = Field(
+ alias="key-id", default=None, description="ID of the encryption key that encrypts the manifest list key metadata"
+ )
def __str__(self) -> str:
"""Return the string representation of the Snapshot class."""
@@ -273,6 +276,7 @@ def __repr__(self) -> str:
f"schema_id={self.schema_id}" if self.schema_id is not None else None,
f"first_row_id={self.first_row_id}" if self.first_row_id is not None else None,
f"added_rows={self.added_rows}" if self.added_rows is not None else None,
+ f"key_id='{self.key_id}'" if self.key_id is not None else None,
]
filtered_fields = [field for field in fields if field is not None]
return f"Snapshot({', '.join(filtered_fields)})"
diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py
index 64838b0bd6..3da5399aed 100644
--- a/pyiceberg/table/update/__init__.py
+++ b/pyiceberg/table/update/__init__.py
@@ -29,7 +29,7 @@
from pyiceberg.exceptions import CommitFailedException
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec
from pyiceberg.schema import Schema
-from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil
+from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil, TableMetadataV3
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
from pyiceberg.table.snapshots import (
MetadataLogEntry,
@@ -322,9 +322,16 @@ def _(
return base_metadata
updated_metadata = base_metadata.model_copy(update={"format_version": update.format_version})
+ updated_metadata = TableMetadataUtil._construct_without_validation(updated_metadata)
+ if (
+ isinstance(updated_metadata, TableMetadataV3)
+ and base_metadata.format_version < 3
+ and updated_metadata.next_row_id is None
+ ):
+ updated_metadata = updated_metadata.model_copy(update={"next_row_id": 0})
context.add_update(update)
- return TableMetadataUtil._construct_without_validation(updated_metadata)
+ return updated_metadata
@_apply_table_update.register(SetPropertiesUpdate)
@@ -435,7 +442,7 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe
elif base_metadata.snapshot_by_id(update.snapshot.snapshot_id) is not None:
raise ValueError(f"Snapshot with id {update.snapshot.snapshot_id} already exists")
elif (
- base_metadata.format_version == 2
+ base_metadata.format_version >= 2
and update.snapshot.sequence_number is not None
and update.snapshot.sequence_number <= base_metadata.last_sequence_number
and update.snapshot.parent_snapshot_id is not None
@@ -446,6 +453,10 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe
)
elif base_metadata.format_version >= 3 and update.snapshot.first_row_id is None:
raise ValueError("Cannot add snapshot without first row id")
+ elif base_metadata.format_version >= 3 and update.snapshot.added_rows is None:
+ raise ValueError("Cannot add snapshot without added rows")
+ elif base_metadata.format_version >= 3 and base_metadata.next_row_id is None:
+ raise ValueError("Cannot add a snapshot when table next-row-id is null")
elif (
base_metadata.format_version >= 3
and update.snapshot.first_row_id is not None
@@ -458,18 +469,19 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe
)
context.add_update(update)
- return base_metadata.model_copy(
- update={
- "last_updated_ms": update.snapshot.timestamp_ms,
- "last_sequence_number": update.snapshot.sequence_number,
- "snapshots": base_metadata.snapshots + [update.snapshot],
- "next_row_id": base_metadata.next_row_id + update.snapshot.added_rows
- if base_metadata.format_version >= 3
- and base_metadata.next_row_id is not None
- and update.snapshot.added_rows is not None
- else None,
- }
- )
+ metadata_updates: dict[str, Any] = {
+ "last_updated_ms": update.snapshot.timestamp_ms,
+ "last_sequence_number": update.snapshot.sequence_number,
+ "snapshots": base_metadata.snapshots + [update.snapshot],
+ }
+ if base_metadata.format_version >= 3:
+ if base_metadata.next_row_id is None:
+ raise ValueError("Cannot add a snapshot when table next-row-id is null")
+ if update.snapshot.added_rows is None:
+ raise ValueError("Cannot add snapshot without added rows")
+ metadata_updates["next_row_id"] = base_metadata.next_row_id + update.snapshot.added_rows
+
+ return base_metadata.model_copy(update=metadata_updates)
@_apply_table_update.register(SetSnapshotRefUpdate)
diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py
index 7931edacdd..c7d9155035 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -21,6 +21,7 @@
from abc import abstractmethod
from collections import defaultdict
from collections.abc import Callable
+from copy import copy
from datetime import datetime
from functools import cached_property
from typing import TYPE_CHECKING, Generic
@@ -264,8 +265,9 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
)
def _commit(self) -> UpdatesAndRequirements:
+ table_metadata = self._transaction.table_metadata
new_manifests = self._manifests()
- next_sequence_number = self._transaction.table_metadata.next_sequence_number()
+ next_sequence_number = table_metadata.next_sequence_number()
summary = self._summary(self.snapshot_properties)
file_name = _new_manifest_list_file_name(
@@ -276,20 +278,31 @@ def _commit(self) -> UpdatesAndRequirements:
location_provider = self._transaction._table.location_provider()
manifest_list_file_path = location_provider.new_metadata_location(file_name)
+ if table_metadata.format_version >= 3:
+ snapshot_first_row_id = table_metadata.next_row_id
+ if snapshot_first_row_id is None:
+ raise ValueError("Cannot commit to a v3 table without next-row-id")
+ else:
+ snapshot_first_row_id = None
+
with write_manifest_list(
- format_version=self._transaction.table_metadata.format_version,
+ format_version=table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
sequence_number=next_sequence_number,
avro_compression=self._compression,
+ snapshot_first_row_id=snapshot_first_row_id,
) as writer:
writer.add_manifests(new_manifests)
- first_row_id: int | None = None
-
- if self._transaction.table_metadata.format_version >= 3:
- first_row_id = self._transaction.table_metadata.next_row_id
+ if table_metadata.format_version >= 3:
+ if writer.next_row_id is None or snapshot_first_row_id is None:
+ raise ValueError("Cannot determine added rows for a v3 snapshot without row IDs")
+ added_rows = writer.next_row_id - snapshot_first_row_id
+ else:
+ added_rows = None
+ first_row_id = snapshot_first_row_id
snapshot = Snapshot(
snapshot_id=self._snapshot_id,
@@ -297,8 +310,9 @@ def _commit(self) -> UpdatesAndRequirements:
manifest_list=manifest_list_file_path,
sequence_number=next_sequence_number,
summary=summary,
- schema_id=self._transaction.table_metadata.current_schema_id,
+ schema_id=table_metadata.current_schema_id,
first_row_id=first_row_id,
+ added_rows=added_rows,
)
add_snapshot_update = AddSnapshotUpdate(snapshot=snapshot)
@@ -428,6 +442,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
# avoid copying metadata for each evaluator
table_metadata = self._transaction.table_metadata
+ is_v3 = table_metadata.format_version >= 3
schema = table_metadata.schema()
manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
@@ -456,16 +471,53 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
# It is relevant, let's check out the content
deleted_entries = []
existing_entries = []
+ # For v3 tables, surviving data files must KEEP the row ids they were
+ # already assigned. Rewriting the manifest would otherwise let the
+ # manifest-list writer re-number them (the source manifest's
+ # first_row_id is dropped). We materialize each surviving file's
+ # absolute _row_id into DataFile field 142 so it survives the rewrite.
+ # A file inherits row id = manifest.first_row_id + sum(record_count of
+ # all preceding data files in the manifest), unless it already carries
+ # an explicit field-142 value.
+ row_id_cursor: int | None = manifest_file.first_row_id if is_v3 else None
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
+ materialized_data_file = entry.data_file
+ if is_v3:
+ explicit_first_row_id = materialized_data_file.first_row_id
+ if explicit_first_row_id is not None:
+ row_id_cursor = explicit_first_row_id
+ if row_id_cursor is None:
+ raise NotImplementedError(
+ "Cannot perform a v3 copy-on-write delete that preserves row "
+ "lineage: the source manifest is missing a first_row_id and "
+ f"the data file {materialized_data_file.file_path} has no "
+ "explicit field-142 first_row_id, so surviving rows cannot be "
+ "renumber-safely rewritten."
+ )
+ if explicit_first_row_id is None:
+ # Persist the inherited absolute row id so it survives the rewrite.
+ materialized_data_file = copy(materialized_data_file)
+ materialized_data_file.first_row_id = row_id_cursor
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
# Based on the metadata, it can be dropped right away
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
self._deleted_data_files.add(entry.data_file)
else:
# Based on the metadata, we cannot determine if it can be deleted
- existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
+ surviving_entry = _copy_with_new_status(entry, ManifestEntryStatus.EXISTING)
+ if is_v3:
+ surviving_entry = ManifestEntry.from_args(
+ status=surviving_entry.status,
+ snapshot_id=surviving_entry.snapshot_id,
+ sequence_number=surviving_entry.sequence_number,
+ file_sequence_number=surviving_entry.file_sequence_number,
+ data_file=materialized_data_file,
+ )
+ existing_entries.append(surviving_entry)
if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH:
partial_rewrites_needed = True
+ if is_v3 and row_id_cursor is not None:
+ row_id_cursor += materialized_data_file.record_count
if len(deleted_entries) > 0:
total_deleted_entries += deleted_entries
@@ -475,7 +527,15 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
with self.new_manifest_writer(spec=self.spec(manifest_file.partition_spec_id)) as writer:
for existing_entry in existing_entries:
writer.add_entry(existing_entry)
- existing_manifests.append(writer.to_manifest_file())
+ rewritten_manifest = writer.to_manifest_file()
+ if is_v3:
+ # Every surviving data file now carries its own field-142
+ # row id, so the manifest-list writer must NOT re-assign a
+ # block. Inherit the source manifest's first_row_id (its
+ # range start) to keep the manifest itself row-id stable and
+ # prevent re-numbering of survivors.
+ rewritten_manifest.first_row_id = manifest_file.first_row_id
+ existing_manifests.append(rewritten_manifest)
else:
existing_manifests.append(manifest_file)
else:
@@ -529,6 +589,44 @@ def _deleted_entries(self) -> list[ManifestEntry]:
return []
+class _AppendDeletionVectors(_FastAppendFiles):
+ """Appends deletion-vector (Puffin) delete files, producing a DELETE snapshot.
+
+ Each added file must be a ``DataFileContent.POSITION_DELETES`` file with
+ ``file_format == FileFormat.PUFFIN`` and a populated ``referenced_data_file``
+ (field 143). They are written into a DELETES manifest so the scan planner routes
+ them through the delete-file index and the existing Puffin reader applies them.
+ """
+
+ def _manifests(self) -> list[ManifestFile]:
+ if not self._added_data_files:
+ raise ValueError("No deletion vectors to commit")
+
+ def _write_dv_manifest() -> list[ManifestFile]:
+ with write_manifest(
+ format_version=self._transaction.table_metadata.format_version,
+ spec=self._transaction.table_metadata.spec(),
+ schema=self.schema(),
+ output_file=self.new_manifest_output(),
+ snapshot_id=self._snapshot_id,
+ avro_compression=self._compression,
+ content=ManifestContent.DELETES,
+ ) as writer:
+ for data_file in self._added_data_files:
+ writer.add(
+ ManifestEntry.from_args(
+ status=ManifestEntryStatus.ADDED,
+ snapshot_id=self._snapshot_id,
+ sequence_number=None,
+ file_sequence_number=None,
+ data_file=data_file,
+ )
+ )
+ return [writer.to_manifest_file()]
+
+ return self._process_manifests(_write_dv_manifest() + self._existing_manifests())
+
+
class _MergeAppendFiles(_FastAppendFiles):
_target_size_bytes: int
_min_count_to_merge: int
@@ -729,6 +827,17 @@ def delete(self) -> _DeleteFiles:
snapshot_properties=self._snapshot_properties,
)
+ def append_deletion_vectors(self) -> _AppendDeletionVectors:
+ if self._transaction.table_metadata.format_version < 3:
+ raise ValueError("Deletion vectors require table format version 3 or higher")
+ return _AppendDeletionVectors(
+ operation=Operation.DELETE,
+ transaction=self._transaction,
+ io=self._io,
+ branch=self._branch,
+ snapshot_properties=self._snapshot_properties,
+ )
+
class _ManifestMergeManager(Generic[U]):
_target_size_bytes: int
@@ -751,8 +860,27 @@ def _group_by_spec(self, manifests: list[ManifestFile]) -> dict[int, list[Manife
return groups
def _create_manifest(self, spec_id: int, manifest_bin: list[ManifestFile]) -> ManifestFile:
+ format_version = self._snapshot_producer._transaction.table_metadata.format_version
+ # For v3 the merged manifest inherits min(first_row_id) and represents the contiguous
+ # range [min, min + total_rows). Reading it back assigns row ids to its DATA files in
+ # ENTRY order, so we must write entries in ascending source-manifest row-id order;
+ # otherwise inheritance would silently re-number rows. The writer emits manifests
+ # newest-first, so we sort here. DATA manifests carrying a first_row_id sort by it;
+ # any without (or delete manifests) keep a stable relative position at the front.
+ if format_version >= 3:
+ ordered_bin = sorted(
+ manifest_bin,
+ key=lambda manifest: (
+ manifest.first_row_id
+ if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None
+ else -1
+ ),
+ )
+ else:
+ ordered_bin = manifest_bin
+
with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer:
- for manifest in manifest_bin:
+ for manifest in ordered_bin:
for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False):
if entry.status == ManifestEntryStatus.DELETED and entry.snapshot_id == self._snapshot_producer.snapshot_id:
# only files deleted by this snapshot should be added to the new manifest
@@ -764,21 +892,85 @@ def _create_manifest(self, spec_id: int, manifest_bin: list[ManifestFile]) -> Ma
# add all non-deleted files from the old manifest as existing files
writer.existing(entry)
- return writer.to_manifest_file()
+ merged_manifest = writer.to_manifest_file()
+ inherited_first_row_ids = [
+ manifest.first_row_id
+ for manifest in manifest_bin
+ if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None
+ ]
+ if inherited_first_row_ids:
+ merged_manifest.first_row_id = min(inherited_first_row_ids)
+
+ return merged_manifest
+
+ def _v3_row_ids_are_contiguous(self, manifest_bin: list[ManifestFile]) -> bool:
+ """Whether the v3 DATA manifests cover a single gapless, non-overlapping row-id range.
+
+ A merged manifest inherits ``min(first_row_id)`` and, because the entries are written
+ in ascending row-id order (see ``_create_manifest``), it represents the contiguous
+ range ``[min, min + total_rows)``. That is only correct when the source manifests'
+ ranges tile that interval exactly. We therefore SORT the ranges by ``first_row_id``
+ and verify they are gapless/non-overlapping. The input order does NOT matter — the
+ writer emits manifests newest-first (descending), so requiring ascending input here
+ would (and previously did) disable v3 merging entirely.
+ """
+ assigned = [
+ manifest
+ for manifest in manifest_bin
+ if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None
+ ]
+ if len(assigned) <= 1:
+ return True
+
+ sorted_assigned = sorted(assigned, key=lambda manifest: manifest.first_row_id or 0)
+
+ cursor = sorted_assigned[0].first_row_id
+ if cursor is None:
+ return False
+
+ for manifest in sorted_assigned:
+ existing_rows_count = manifest.existing_rows_count
+ added_rows_count = manifest.added_rows_count
+ if existing_rows_count is None or added_rows_count is None:
+ return False
+ if manifest.first_row_id != cursor:
+ return False
+ cursor += existing_rows_count + added_rows_count
+
+ return True
def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: list[ManifestFile]) -> list[ManifestFile]:
packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False)
bins: list[list[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length)
+ format_version = self._snapshot_producer._transaction.table_metadata.format_version
def merge_bin(manifest_bin: list[ManifestFile]) -> list[ManifestFile]:
output_manifests = []
if len(manifest_bin) == 1:
output_manifests.append(manifest_bin[0])
+ elif (
+ format_version >= 3
+ and first_manifest in manifest_bin
+ and first_manifest.content == ManifestContent.DATA
+ and first_manifest.first_row_id is None
+ ):
+ remaining_manifests = [manifest for manifest in manifest_bin if manifest != first_manifest]
+ output_manifests.append(first_manifest)
+ if len(remaining_manifests) == 1:
+ output_manifests.append(remaining_manifests[0])
+ elif len(remaining_manifests) < self._min_count_to_merge:
+ output_manifests.extend(remaining_manifests)
+ elif not self._v3_row_ids_are_contiguous(remaining_manifests):
+ output_manifests.extend(remaining_manifests)
+ else:
+ output_manifests.append(self._create_manifest(spec_id, remaining_manifests))
elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge:
# if the bin has the first manifest (the new data files or an appended manifest file) then only
# merge it if the number of manifests is above the minimum count. this is applied only to bins
# with an in-memory manifest so that large manifests don't prevent merging older groups.
output_manifests.extend(manifest_bin)
+ elif format_version >= 3 and not self._v3_row_ids_are_contiguous(manifest_bin):
+ output_manifests.extend(manifest_bin)
else:
output_manifests.append(self._create_manifest(spec_id, manifest_bin))
diff --git a/pyiceberg/utils/geospatial.py b/pyiceberg/utils/geospatial.py
new file mode 100644
index 0000000000..11b381d801
--- /dev/null
+++ b/pyiceberg/utils/geospatial.py
@@ -0,0 +1,486 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import math
+import struct
+from dataclasses import dataclass
+
+_WKB_POINT = 1
+_WKB_LINESTRING = 2
+_WKB_POLYGON = 3
+_WKB_MULTIPOINT = 4
+_WKB_MULTILINESTRING = 5
+_WKB_MULTIPOLYGON = 6
+_WKB_GEOMETRYCOLLECTION = 7
+
+_EWKB_Z_FLAG = 0x80000000
+_EWKB_M_FLAG = 0x40000000
+_EWKB_SRID_FLAG = 0x20000000
+
+# The 32-byte bound wire format is shared by XYM and XYZM. XYM reserves a NaN
+# value in the z slot as a sentinel, so a real NaN z cannot be serialized when
+# m is present without losing whether z existed.
+
+
+@dataclass(frozen=True)
+class GeospatialBound:
+ x: float
+ y: float
+ z: float | None = None
+ m: float | None = None
+
+ @property
+ def has_z(self) -> bool:
+ return self.z is not None
+
+ @property
+ def has_m(self) -> bool:
+ return self.m is not None
+
+
+@dataclass(frozen=True)
+class GeometryEnvelope:
+ x_min: float
+ y_min: float
+ z_min: float | None
+ m_min: float | None
+ x_max: float
+ y_max: float
+ z_max: float | None
+ m_max: float | None
+
+ def to_min_bound(self) -> GeospatialBound:
+ return GeospatialBound(x=self.x_min, y=self.y_min, z=self.z_min, m=self.m_min)
+
+ def to_max_bound(self) -> GeospatialBound:
+ return GeospatialBound(x=self.x_max, y=self.y_max, z=self.z_max, m=self.m_max)
+
+
+def serialize_geospatial_bound(bound: GeospatialBound) -> bytes:
+ if bound.z is not None and bound.m is not None and math.isnan(bound.z):
+ raise ValueError("Cannot serialize geospatial bound with NaN z and m; NaN z is reserved as the XYM sentinel")
+
+ if bound.z is None and bound.m is None:
+ return struct.pack("
GeospatialBound:
+ if len(raw) == 16:
+ x, y = struct.unpack(" GeometryEnvelope | None:
+ reader = _WKBReader(wkb)
+ accumulator = _EnvelopeAccumulator(is_geography=is_geography)
+ _parse_geometry(reader, accumulator)
+ if reader.remaining() != 0:
+ raise ValueError(f"Trailing bytes found after parsing WKB: {reader.remaining()}")
+ return accumulator.finish()
+
+
+def merge_envelopes(left: GeometryEnvelope, right: GeometryEnvelope, is_geography: bool) -> GeometryEnvelope:
+ if is_geography:
+ x_min, x_max = _merge_longitude_intervals(left.x_min, left.x_max, right.x_min, right.x_max)
+ else:
+ x_min, x_max = min(left.x_min, right.x_min), max(left.x_max, right.x_max)
+
+ return GeometryEnvelope(
+ x_min=x_min,
+ y_min=min(left.y_min, right.y_min),
+ z_min=_merge_optional_min(left.z_min, right.z_min),
+ m_min=_merge_optional_min(left.m_min, right.m_min),
+ x_max=x_max,
+ y_max=max(left.y_max, right.y_max),
+ z_max=_merge_optional_max(left.z_max, right.z_max),
+ m_max=_merge_optional_max(left.m_max, right.m_max),
+ )
+
+
+class GeospatialStatsAggregator:
+ def __init__(self, is_geography: bool) -> None:
+ self.is_geography = is_geography
+ self._envelope: GeometryEnvelope | None = None
+
+ def add(self, wkb: bytes) -> None:
+ envelope = extract_envelope_from_wkb(wkb, self.is_geography)
+ if envelope is None:
+ return
+
+ if self._envelope is None:
+ self._envelope = envelope
+ else:
+ self._envelope = merge_envelopes(self._envelope, envelope, self.is_geography)
+
+ def min_bound(self) -> GeospatialBound | None:
+ if self._envelope is None:
+ return None
+ return self._envelope.to_min_bound()
+
+ def max_bound(self) -> GeospatialBound | None:
+ if self._envelope is None:
+ return None
+ return self._envelope.to_max_bound()
+
+ def serialized_min(self) -> bytes | None:
+ bound = self.min_bound()
+ if bound is None:
+ return None
+ return serialize_geospatial_bound(bound)
+
+ def serialized_max(self) -> bytes | None:
+ bound = self.max_bound()
+ if bound is None:
+ return None
+ return serialize_geospatial_bound(bound)
+
+
+def _merge_optional_min(left: float | None, right: float | None) -> float | None:
+ if left is None:
+ return right
+ if right is None:
+ return left
+ return min(left, right)
+
+
+def _merge_optional_max(left: float | None, right: float | None) -> float | None:
+ if left is None:
+ return right
+ if right is None:
+ return left
+ return max(left, right)
+
+
+@dataclass
+class _EnvelopeAccumulator:
+ is_geography: bool
+ x_min: float | None = None
+ y_min: float | None = None
+ z_min: float | None = None
+ m_min: float | None = None
+ x_max: float | None = None
+ y_max: float | None = None
+ z_max: float | None = None
+ m_max: float | None = None
+ longitudes: list[float] | None = None
+
+ def __post_init__(self) -> None:
+ if self.is_geography:
+ self.longitudes = []
+
+ def add_point(self, x: float, y: float, z: float | None, m: float | None) -> None:
+ if math.isnan(x) or math.isnan(y):
+ return
+
+ if self.is_geography:
+ if self.longitudes is None:
+ self.longitudes = []
+ self.longitudes.append(_normalize_longitude(x))
+ else:
+ self.x_min = x if self.x_min is None else min(self.x_min, x)
+ self.x_max = x if self.x_max is None else max(self.x_max, x)
+
+ self.y_min = y if self.y_min is None else min(self.y_min, y)
+ self.y_max = y if self.y_max is None else max(self.y_max, y)
+
+ if z is not None and not math.isnan(z):
+ self.z_min = z if self.z_min is None else min(self.z_min, z)
+ self.z_max = z if self.z_max is None else max(self.z_max, z)
+
+ if m is not None and not math.isnan(m):
+ self.m_min = m if self.m_min is None else min(self.m_min, m)
+ self.m_max = m if self.m_max is None else max(self.m_max, m)
+
+ def finish(self) -> GeometryEnvelope | None:
+ if self.y_min is None or self.y_max is None:
+ return None
+
+ if self.is_geography:
+ if not self.longitudes:
+ return None
+ x_min, x_max = _minimal_longitude_interval(self.longitudes)
+ else:
+ if self.x_min is None or self.x_max is None:
+ return None
+ x_min, x_max = self.x_min, self.x_max
+
+ return GeometryEnvelope(
+ x_min=x_min,
+ y_min=self.y_min,
+ z_min=self.z_min,
+ m_min=self.m_min,
+ x_max=x_max,
+ y_max=self.y_max,
+ z_max=self.z_max,
+ m_max=self.m_max,
+ )
+
+
+class _WKBReader:
+ def __init__(self, payload: bytes) -> None:
+ self._payload = payload
+ self._offset = 0
+
+ def remaining(self) -> int:
+ return len(self._payload) - self._offset
+
+ def read_byte(self) -> int:
+ self._ensure_size(1)
+ value = self._payload[self._offset]
+ self._offset += 1
+ return value
+
+ def read_uint32(self, little_endian: bool) -> int:
+ return int(self._read_fmt("I"))
+
+ def read_double(self, little_endian: bool) -> float:
+ return float(self._read_fmt("d"))
+
+ def _read_fmt(self, fmt: str) -> float | int:
+ size = struct.calcsize(fmt)
+ self._ensure_size(size)
+ value = struct.unpack_from(fmt, self._payload, self._offset)[0]
+ self._offset += size
+ return value
+
+ def _ensure_size(self, expected: int) -> None:
+ if self._offset + expected > len(self._payload):
+ raise ValueError("Unexpected end of WKB payload")
+
+
+def _parse_geometry(reader: _WKBReader, accumulator: _EnvelopeAccumulator) -> None:
+ little_endian = _parse_byte_order(reader.read_byte())
+ raw_type = reader.read_uint32(little_endian)
+ geometry_type, has_z, has_m = _parse_geometry_type(raw_type)
+
+ if raw_type & _EWKB_SRID_FLAG:
+ reader.read_uint32(little_endian)
+
+ if geometry_type == _WKB_POINT:
+ _parse_point(reader, accumulator, little_endian, has_z, has_m)
+ elif geometry_type == _WKB_LINESTRING:
+ _parse_points(reader, accumulator, little_endian, has_z, has_m)
+ elif geometry_type == _WKB_POLYGON:
+ _parse_polygon(reader, accumulator, little_endian, has_z, has_m)
+ elif geometry_type in (_WKB_MULTIPOINT, _WKB_MULTILINESTRING, _WKB_MULTIPOLYGON, _WKB_GEOMETRYCOLLECTION):
+ _parse_collection(reader, accumulator, little_endian)
+ else:
+ raise ValueError(f"Unsupported WKB geometry type: {geometry_type}")
+
+
+def _parse_byte_order(order: int) -> bool:
+ if order == 1:
+ return True
+ if order == 0:
+ return False
+ raise ValueError(f"Unsupported WKB byte order marker: {order}")
+
+
+def _parse_geometry_type(raw_type: int) -> tuple[int, bool, bool]:
+ has_z = bool(raw_type & _EWKB_Z_FLAG)
+ has_m = bool(raw_type & _EWKB_M_FLAG)
+ type_code = raw_type & 0x1FFFFFFF
+
+ if type_code >= 3000:
+ has_z = True
+ has_m = True
+ type_code -= 3000
+ elif type_code >= 2000:
+ has_m = True
+ type_code -= 2000
+ elif type_code >= 1000:
+ has_z = True
+ type_code -= 1000
+
+ return type_code, has_z, has_m
+
+
+def _parse_collection(reader: _WKBReader, accumulator: _EnvelopeAccumulator, little_endian: bool) -> None:
+ num_geometries = reader.read_uint32(little_endian)
+ for _ in range(num_geometries):
+ _parse_geometry(reader, accumulator)
+
+
+def _parse_polygon(
+ reader: _WKBReader,
+ accumulator: _EnvelopeAccumulator,
+ little_endian: bool,
+ has_z: bool,
+ has_m: bool,
+) -> None:
+ num_rings = reader.read_uint32(little_endian)
+ for _ in range(num_rings):
+ _parse_points(reader, accumulator, little_endian, has_z, has_m)
+
+
+def _parse_points(
+ reader: _WKBReader,
+ accumulator: _EnvelopeAccumulator,
+ little_endian: bool,
+ has_z: bool,
+ has_m: bool,
+) -> None:
+ count = reader.read_uint32(little_endian)
+ for _ in range(count):
+ x = reader.read_double(little_endian)
+ y = reader.read_double(little_endian)
+ if has_z and has_m:
+ z = reader.read_double(little_endian)
+ m = reader.read_double(little_endian)
+ elif has_z:
+ z = reader.read_double(little_endian)
+ m = None
+ elif has_m:
+ z = None
+ m = reader.read_double(little_endian)
+ else:
+ z = None
+ m = None
+ accumulator.add_point(x=x, y=y, z=z, m=m)
+
+
+def _parse_point(
+ reader: _WKBReader,
+ accumulator: _EnvelopeAccumulator,
+ little_endian: bool,
+ has_z: bool,
+ has_m: bool,
+) -> None:
+ x = reader.read_double(little_endian)
+ y = reader.read_double(little_endian)
+
+ if has_z and has_m:
+ z = reader.read_double(little_endian)
+ m = reader.read_double(little_endian)
+ elif has_z:
+ z = reader.read_double(little_endian)
+ m = None
+ elif has_m:
+ z = None
+ m = reader.read_double(little_endian)
+ else:
+ z = None
+ m = None
+
+ accumulator.add_point(x=x, y=y, z=z, m=m)
+
+
+def _normalize_longitude(value: float) -> float:
+ normalized = ((value + 180.0) % 360.0) - 180.0
+ if math.isclose(normalized, -180.0) and value > 0:
+ return 180.0
+ return normalized
+
+
+def _to_circle(value: float) -> float:
+ if math.isclose(value, 180.0):
+ return 360.0
+ return value + 180.0
+
+
+def _from_circle(value: float) -> float:
+ if math.isclose(value, 360.0):
+ return 180.0
+ return value - 180.0
+
+
+def _minimal_longitude_interval(longitudes: list[float]) -> tuple[float, float]:
+ # Converting longitude bounds to/from circle coordinates can introduce tiny
+ # floating-point drift at the reconstructed interval edges. Pruning callers
+ # must use geospatial_pruning.bbox_might_match, which applies conservative
+ # boundary tolerance instead of exact edge comparisons.
+ points = sorted({_to_circle(_normalize_longitude(v)) % 360.0 for v in longitudes})
+ if len(points) == 1:
+ lon = _from_circle(points[0])
+ return lon, lon
+
+ max_gap = -1.0
+ max_gap_idx = 0
+ for idx in range(len(points)):
+ current = points[idx]
+ nxt = points[(idx + 1) % len(points)] + (360.0 if idx == len(points) - 1 else 0.0)
+ gap = nxt - current
+ if gap > max_gap:
+ max_gap = gap
+ max_gap_idx = idx
+
+ start = points[(max_gap_idx + 1) % len(points)]
+ end = points[max_gap_idx]
+ return _from_circle(start), _from_circle(end)
+
+
+def _merge_longitude_intervals(left_min: float, left_max: float, right_min: float, right_max: float) -> tuple[float, float]:
+ segments = _interval_to_segments(left_min, left_max) + _interval_to_segments(right_min, right_max)
+ merged = _merge_segments(segments)
+ if not merged:
+ raise ValueError("Cannot merge empty longitude intervals")
+
+ largest_gap = -1.0
+ gap_start = 0.0
+ gap_end = 0.0
+ for idx in range(len(merged)):
+ current_end = merged[idx][1]
+ next_start = merged[(idx + 1) % len(merged)][0] + (360.0 if idx == len(merged) - 1 else 0.0)
+ gap = next_start - current_end
+ if gap > largest_gap:
+ largest_gap = gap
+ gap_start = current_end
+ gap_end = next_start
+
+ if largest_gap <= 1e-12:
+ return -180.0, 180.0
+
+ start = gap_end % 360.0
+ end = gap_start % 360.0
+ return _from_circle(start), _from_circle(end)
+
+
+def _interval_to_segments(x_min: float, x_max: float) -> list[tuple[float, float]]:
+ start = _to_circle(_normalize_longitude(x_min))
+ end = _to_circle(_normalize_longitude(x_max))
+
+ if x_min <= x_max:
+ return [(start, end)]
+ return [(start, 360.0), (0.0, end)]
+
+
+def _merge_segments(segments: list[tuple[float, float]]) -> list[tuple[float, float]]:
+ if not segments:
+ return []
+
+ ordered = sorted(segments, key=lambda pair: pair[0])
+ merged: list[tuple[float, float]] = [ordered[0]]
+ for start, end in ordered[1:]:
+ previous_start, previous_end = merged[-1]
+ if start <= previous_end:
+ merged[-1] = (previous_start, max(previous_end, end))
+ else:
+ merged.append((start, end))
+ return merged
diff --git a/pyiceberg/utils/geospatial_pruning.py b/pyiceberg/utils/geospatial_pruning.py
new file mode 100644
index 0000000000..ee66c6a1bb
--- /dev/null
+++ b/pyiceberg/utils/geospatial_pruning.py
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Geospatial bounding-box pruning helpers.
+
+Returns False ONLY when no row in the file can possibly match; any uncertainty
+returns True. False negatives (wrongly returning False) would cause silent data loss.
+"""
+
+from __future__ import annotations
+
+import math
+
+from pyiceberg.utils.geospatial import deserialize_geospatial_bound, extract_envelope_from_wkb
+
+_SUPPORTED_PREDICATES = {"st-contains", "st-intersects", "st-overlaps", "st-within"}
+_BOUNDARY_ABSOLUTE_EPSILON = 1e-7
+_BOUNDARY_RELATIVE_EPSILON = 1e-12
+
+
+def bbox_might_match(
+ predicate: str,
+ query_wkb: bytes,
+ lower_bound: bytes | None,
+ upper_bound: bytes | None,
+ is_geography: bool,
+) -> bool:
+ if predicate not in _SUPPORTED_PREDICATES:
+ raise ValueError(f"Unsupported geospatial predicate for bbox pruning: {predicate}")
+
+ if lower_bound is None or upper_bound is None:
+ return True
+
+ query_envelope = extract_envelope_from_wkb(query_wkb, is_geography)
+ if query_envelope is None:
+ return True
+
+ lower = deserialize_geospatial_bound(lower_bound)
+ upper = deserialize_geospatial_bound(upper_bound)
+
+ y_overlaps = _scalar_intervals_overlap(lower.y, upper.y, query_envelope.y_min, query_envelope.y_max)
+ if not y_overlaps:
+ return False
+
+ if is_geography:
+ x_overlaps = _longitude_intervals_overlap(lower.x, upper.x, query_envelope.x_min, query_envelope.x_max)
+ else:
+ x_overlaps = _scalar_intervals_overlap(lower.x, upper.x, query_envelope.x_min, query_envelope.x_max)
+
+ return x_overlaps
+
+
+def _scalar_intervals_overlap(left_min: float, left_max: float, right_min: float, right_max: float) -> bool:
+ left_start = min(left_min, left_max)
+ left_end = max(left_min, left_max)
+ right_start = min(right_min, right_max)
+ right_end = max(right_min, right_max)
+
+ # BBox pruning must be conservative: returning False can drop matching rows.
+ # Geography bounds round-trip longitude through circle coordinates, which can
+ # drift stored edges inward by tiny double-precision amounts. Expand only the
+ # boundary comparisons so edge-equal queries remain "might match".
+ epsilon = _interval_boundary_epsilon(left_start, left_end, right_start, right_end)
+ return left_start <= right_end + epsilon and right_start <= left_end + epsilon
+
+
+def _interval_boundary_epsilon(*values: float) -> float:
+ finite_values = (abs(value) for value in values if math.isfinite(value))
+ scale = max(finite_values, default=1.0)
+ return _BOUNDARY_ABSOLUTE_EPSILON + (_BOUNDARY_RELATIVE_EPSILON * scale)
+
+
+def _longitude_intervals_overlap(left_min: float, left_max: float, right_min: float, right_max: float) -> bool:
+ left_segments = _longitude_interval_to_segments(left_min, left_max)
+ right_segments = _longitude_interval_to_segments(right_min, right_max)
+
+ return any(
+ _longitude_segments_overlap(left_segment, right_segment)
+ for left_segment in left_segments
+ for right_segment in right_segments
+ )
+
+
+def _longitude_segments_overlap(left_segment: tuple[float, float], right_segment: tuple[float, float]) -> bool:
+ left_start, left_end = left_segment
+ right_start, right_end = right_segment
+
+ return any(
+ _scalar_intervals_overlap(left_start, left_end, right_start + shift, right_end + shift) for shift in (-360.0, 0.0, 360.0)
+ )
+
+
+def _longitude_interval_to_segments(x_min: float, x_max: float) -> list[tuple[float, float]]:
+ start = _longitude_to_circle(x_min)
+ end = _longitude_to_circle(x_max)
+
+ if _is_full_longitude_interval(x_min, x_max):
+ return [(0.0, 360.0)]
+
+ if x_min <= x_max:
+ return [(start, end)]
+
+ return [(start, 360.0), (0.0, end)]
+
+
+def _is_full_longitude_interval(x_min: float, x_max: float) -> bool:
+ return math.isclose(_normalize_longitude(x_min), -180.0) and math.isclose(_normalize_longitude(x_max), 180.0)
+
+
+def _normalize_longitude(value: float) -> float:
+ normalized = ((value + 180.0) % 360.0) - 180.0
+ if math.isclose(normalized, -180.0) and value > 0:
+ return 180.0
+ return normalized
+
+
+def _longitude_to_circle(value: float) -> float:
+ normalized = _normalize_longitude(value)
+ if math.isclose(normalized, 180.0):
+ return 360.0
+ return normalized + 180.0
diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py
index 137215ebc8..0441bd32f4 100644
--- a/tests/avro/test_file.py
+++ b/tests/avro/test_file.py
@@ -164,6 +164,10 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None:
del v2_entry["file_sequence_number"]
del v2_entry["data_file"]["content"]
del v2_entry["data_file"]["equality_ids"]
+ # first_row_id (142) and the deletion-vector fields (143-145) are v3-only
+ # DataFile fields, not present in the V1/V2 schema.
+ for v3_only_field in ("first_row_id", "referenced_data_file", "content_offset", "content_size_in_bytes"):
+ v2_entry["data_file"].pop(v3_only_field, None)
# Required in V1
v2_entry["data_file"]["block_size_in_bytes"] = DEFAULT_BLOCK_SIZE
@@ -222,7 +226,12 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None:
fa_entry = next(it)
- assert todict(entry) == fa_entry
+ v2_entry = todict(entry)
+ # first_row_id (142) and the deletion-vector fields (143-145) are v3-only
+ # DataFile fields, not present in the V2 schema.
+ for v3_only_field in ("first_row_id", "referenced_data_file", "content_offset", "content_size_in_bytes"):
+ v2_entry["data_file"].pop(v3_only_field, None)
+ assert v2_entry == fa_entry
@pytest.mark.parametrize("format_version", [1, 2])
diff --git a/tests/table/test_encryption_metadata.py b/tests/table/test_encryption_metadata.py
new file mode 100644
index 0000000000..2be6421a10
--- /dev/null
+++ b/tests/table/test_encryption_metadata.py
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from copy import deepcopy
+from typing import Any
+
+import pytest
+from pydantic import ValidationError
+
+from pyiceberg.table.encryption import EncryptedKey
+from pyiceberg.table.metadata import TableMetadataV3
+from pyiceberg.table.snapshots import Snapshot
+
+
+@pytest.mark.parametrize(
+ "payload",
+ [
+ {},
+ {"key-id": "key-a"},
+ {"encrypted-key-metadata": "ZW5jcnlwdGVkLW1ldGFkYXRh"},
+ ],
+)
+def test_encrypted_key_requires_key_id_and_metadata(payload: dict[str, Any]) -> None:
+ with pytest.raises(ValidationError):
+ EncryptedKey.model_validate(payload)
+
+
+def test_encrypted_key_deserialization_with_all_fields() -> None:
+ encrypted_key = EncryptedKey.model_validate(
+ {
+ "key-id": "key-a",
+ "encrypted-key-metadata": "ZW5jcnlwdGVkLW1ldGFkYXRh",
+ "encrypted-by-id": "root-key",
+ "properties": {"kms": "test", "purpose": "table"},
+ }
+ )
+
+ assert encrypted_key.key_id == "key-a"
+ assert encrypted_key.encrypted_key_metadata == "ZW5jcnlwdGVkLW1ldGFkYXRh"
+ assert encrypted_key.encrypted_by_id == "root-key"
+ assert encrypted_key.properties == {"kms": "test", "purpose": "table"}
+
+
+def test_encrypted_key_deserialization_with_required_fields_only() -> None:
+ encrypted_key = EncryptedKey.model_validate(
+ {
+ "key-id": "key-a",
+ "encrypted-key-metadata": "ZW5jcnlwdGVkLW1ldGFkYXRh",
+ }
+ )
+
+ assert encrypted_key.key_id == "key-a"
+ assert encrypted_key.encrypted_key_metadata == "ZW5jcnlwdGVkLW1ldGFkYXRh"
+ assert encrypted_key.encrypted_by_id is None
+ assert encrypted_key.properties is None
+
+
+def test_encrypted_key_serialization_round_trip_uses_aliases() -> None:
+ encrypted_key = EncryptedKey.model_validate(
+ {
+ "key-id": "key-a",
+ "encrypted-key-metadata": "ZW5jcnlwdGVkLW1ldGFkYXRh",
+ "encrypted-by-id": "root-key",
+ "properties": {"kms": "test"},
+ }
+ )
+
+ serialized = encrypted_key.model_dump_json(by_alias=True)
+
+ assert EncryptedKey.model_validate_json(serialized) == encrypted_key
+ assert '"key-id"' in serialized
+ assert '"encrypted-key-metadata"' in serialized
+ assert "key_id" not in serialized
+ assert "encrypted_key_metadata" not in serialized
+
+
+def test_snapshot_key_id_deserialization_and_serialization() -> None:
+ snapshot = Snapshot.model_validate(
+ {
+ "snapshot-id": 25,
+ "timestamp-ms": 1602638573590,
+ "manifest-list": "s3:/a/b/c.avro",
+ "key-id": "manifest-list-key",
+ }
+ )
+ snapshot_without_key = Snapshot.model_validate(
+ {
+ "snapshot-id": 26,
+ "timestamp-ms": 1602638573591,
+ "manifest-list": "s3:/a/b/d.avro",
+ }
+ )
+
+ assert snapshot.key_id == "manifest-list-key"
+ assert snapshot_without_key.key_id is None
+ assert '"key-id":"manifest-list-key"' in snapshot.model_dump_json(by_alias=True)
+
+
+def test_table_metadata_v3_encryption_keys_deserialization(example_table_metadata_v3: dict[str, Any]) -> None:
+ metadata_dict = deepcopy(example_table_metadata_v3)
+ metadata_dict["encryption-keys"] = [
+ {
+ "key-id": "key-a",
+ "encrypted-key-metadata": "ZW5jcnlwdGVkLW1ldGFkYXRh",
+ }
+ ]
+
+ metadata = TableMetadataV3(**metadata_dict)
+
+ assert metadata.encryption_keys is not None
+ assert metadata.encryption_keys[0].key_id == "key-a"
+ assert metadata.encryption_keys[0].encrypted_key_metadata == "ZW5jcnlwdGVkLW1ldGFkYXRh"
+
+
+def test_table_metadata_v3_without_encryption_keys_deserialization(example_table_metadata_v3: dict[str, Any]) -> None:
+ metadata = TableMetadataV3(**deepcopy(example_table_metadata_v3))
+
+ assert metadata.encryption_keys is None
diff --git a/tests/table/test_init.py b/tests/table/test_init.py
index 7e64e6e7c0..e789ea98f0 100644
--- a/tests/table/test_init.py
+++ b/tests/table/test_init.py
@@ -1718,6 +1718,7 @@ def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) ->
summary=Summary(Operation.APPEND),
schema_id=3,
first_row_id=0,
+ added_rows=10,
)
with pytest.raises(
@@ -1727,6 +1728,47 @@ def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) ->
update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),))
+def test_add_snapshot_update_fails_without_added_rows(table_v3: Table) -> None:
+ new_snapshot = Snapshot(
+ snapshot_id=25,
+ parent_snapshot_id=19,
+ sequence_number=200,
+ timestamp_ms=1602638593590,
+ manifest_list="s3:/a/b/c.avro",
+ summary=Summary(Operation.APPEND),
+ schema_id=3,
+ first_row_id=1,
+ )
+
+ with pytest.raises(
+ ValueError,
+ match="Cannot add snapshot without added rows",
+ ):
+ update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),))
+
+
+def test_add_snapshot_update_fails_with_null_table_next_row_id(table_v3: Table) -> None:
+ new_snapshot = Snapshot(
+ snapshot_id=25,
+ parent_snapshot_id=19,
+ sequence_number=200,
+ timestamp_ms=1602638593590,
+ manifest_list="s3:/a/b/c.avro",
+ summary=Summary(Operation.APPEND),
+ schema_id=3,
+ first_row_id=1,
+ added_rows=10,
+ )
+
+ with pytest.raises(
+ ValueError,
+ match="Cannot add a snapshot when table next-row-id is null",
+ ):
+ update_table_metadata(
+ table_v3.metadata.model_copy(update={"next_row_id": None}), (AddSnapshotUpdate(snapshot=new_snapshot),)
+ )
+
+
def test_add_snapshot_update_updates_next_row_id(table_v3: Table) -> None:
new_snapshot = Snapshot(
snapshot_id=25,
diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py
index c163c90626..6a99ef0c2b 100644
--- a/tests/table/test_metadata.py
+++ b/tests/table/test_metadata.py
@@ -184,12 +184,9 @@ def test_serialize_v2(example_table_metadata_v2: dict[str, Any]) -> None:
def test_serialize_v3(example_table_metadata_v3: dict[str, Any]) -> None:
- # Writing will be part of https://github.com/apache/iceberg-python/issues/1551
-
- with pytest.raises(NotImplementedError) as exc_info:
- _ = TableMetadataV3(**example_table_metadata_v3).model_dump_json()
-
- assert "Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551" in str(exc_info.value)
+ table_metadata = TableMetadataV3(**example_table_metadata_v3).model_dump_json()
+ expected = """{"location":"s3://bucket/test/location","table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true},{"id":4,"name":"u","type":"unknown","required":true},{"id":5,"name":"ns","type":"timestamp_ns","required":true},{"id":6,"name":"nstz","type":"timestamptz_ns","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"current-snapshot-id":3055729675574597004,"snapshots":[{"snapshot-id":3051729675574597004,"sequence-number":0,"timestamp-ms":1515100955770,"manifest-list":"s3://a/b/1.avro","summary":{"operation":"append"}},{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1}],"snapshot-log":[{"snapshot-id":3051729675574597004,"timestamp-ms":1515100955770},{"snapshot-id":3055729675574597004,"timestamp-ms":1555100955770}],"metadata-log":[{"metadata-file":"s3://bucket/.../v1.json","timestamp-ms":1515100}],"sort-orders":[{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]}],"default-sort-order-id":3,"refs":{"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000},"main":{"snapshot-id":3055729675574597004,"type":"branch"}},"statistics":[],"partition-statistics":[],"format-version":3,"last-sequence-number":34,"next-row-id":1}"""
+ assert table_metadata == expected
def test_migrate_v1_schemas(example_table_metadata_v1: dict[str, Any]) -> None:
@@ -837,6 +834,7 @@ def test_new_table_metadata_with_v3_schema() -> None:
default_sort_order_id=1,
refs={},
format_version=3,
+ next_row_id=0,
)
assert actual.model_dump() == expected.model_dump()
diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py
index bf8c82014c..0e71adceca 100644
--- a/tests/table/test_puffin.py
+++ b/tests/table/test_puffin.py
@@ -14,12 +14,25 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import json
+import random
+import zlib
from os import path
+from pathlib import Path
import pytest
from pyroaring import BitMap
-from pyiceberg.table.puffin import _deserialize_bitmap
+from pyiceberg import __version__
+from pyiceberg.io.pyarrow import PyArrowFileIO
+from pyiceberg.table.puffin import (
+ DELETION_VECTOR_MAGIC,
+ MAGIC_BYTES,
+ PROPERTY_REFERENCED_DATA_FILE,
+ PuffinFile,
+ PuffinWriter,
+ _deserialize_bitmap,
+)
def _open_file(file: str) -> bytes:
@@ -71,3 +84,268 @@ def test_map_high_vals() -> None:
with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"):
_ = _deserialize_bitmap(puffin)
+
+
+def _new_writer(tmp_path: Path, created_by: str | None = None) -> tuple[PuffinWriter, Path]:
+ puffin_path = tmp_path / "test.puffin"
+ return PuffinWriter(PyArrowFileIO().new_output(str(puffin_path)), created_by=created_by), puffin_path
+
+
+def test_puffin_round_trip(tmp_path: Path) -> None:
+ # Define some deletion positions for a file
+ deletions = [5, (1 << 32) + 1, 5] # Test with a high-bit position and duplicate
+
+ file_path = "path/to/data.parquet"
+
+ # Write the Puffin file
+ writer, puffin_path = _new_writer(tmp_path, created_by="my-test-app")
+ writer.set_blob(positions=deletions, referenced_data_file=file_path)
+ size = writer.finish()
+
+ # Read the Puffin file back
+ puffin_bytes = puffin_path.read_bytes()
+ assert size == len(puffin_bytes)
+ reader = PuffinFile(puffin_bytes)
+
+ # Assert footer metadata
+ assert reader.footer.properties["created-by"] == "my-test-app"
+ assert len(reader.footer.blobs) == 1
+
+ blob_meta = reader.footer.blobs[0]
+ assert blob_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file_path
+ assert blob_meta.properties["cardinality"] == str(len(set(deletions)))
+
+ # Assert the content of deletion vectors
+ read_vectors = reader.to_vector()
+
+ assert file_path in read_vectors
+ assert read_vectors[file_path].to_pylist() == sorted(set(deletions))
+
+
+def test_puffin_round_trip_with_sparse_bitmap_keys(tmp_path: Path) -> None:
+ # High bits 0 and 2 are present while 1 is absent; the writer must emit sorted keys
+ # and the reader pads the missing key with an empty bitmap.
+ positions = [3, (2 << 32) + 4]
+ writer, puffin_path = _new_writer(tmp_path)
+ writer.set_blob(positions=positions, referenced_data_file="file.parquet")
+ writer.finish()
+
+ vectors = PuffinFile(puffin_path.read_bytes()).to_vector()
+ assert vectors["file.parquet"].to_pylist() == positions
+
+
+def test_dv_roundtrip_empty_high_key(tmp_path: Path) -> None:
+ positions = [(3 << 32) + 0, (3 << 32) + 9]
+ writer, puffin_path = _new_writer(tmp_path)
+ writer.set_blob(positions=positions, referenced_data_file="file.parquet")
+ writer.finish()
+
+ vectors = PuffinFile(puffin_path.read_bytes()).to_vector()
+ assert vectors["file.parquet"].to_pylist() == positions
+
+
+def test_dv_roundtrip_large_sparse(tmp_path: Path) -> None:
+ random_generator = random.Random(7321)
+ positions: list[int] = [
+ (1 << 32) - 1,
+ 1 << 32,
+ (2 << 32) - 1,
+ 2 << 32,
+ (7 << 32) + 123,
+ ]
+
+ for key in (0, 1, 3, 7):
+ base = key << 32
+ positions.extend(base + low for low in range(8_000, 14_000))
+ positions.extend(base + random_generator.randrange(0, 0xFFFFFFFF) for _ in range(6_500))
+
+ expected = sorted(set(positions))
+ assert len(expected) > 50_000
+
+ writer, puffin_path = _new_writer(tmp_path)
+ writer.set_blob(positions=positions, referenced_data_file="file.parquet")
+ writer.finish()
+
+ vectors = PuffinFile(puffin_path.read_bytes()).to_vector()
+ assert vectors["file.parquet"].to_pylist() == expected
+
+
+def test_dv_roundtrip_boundary_positions(tmp_path: Path) -> None:
+ positions = [0, 1, (1 << 32) - 1, 1 << 32, (1 << 32) + 1, (2 << 32) + 5]
+ writer, puffin_path = _new_writer(tmp_path)
+ writer.set_blob(positions=positions, referenced_data_file="file.parquet")
+ writer.finish()
+
+ vectors = PuffinFile(puffin_path.read_bytes()).to_vector()
+ assert vectors["file.parquet"].to_pylist() == positions
+
+
+def test_dv_blob_crc_independently_verified(tmp_path: Path) -> None:
+ positions = [0, 7, (1 << 32) - 1, 1 << 32, (4 << 32) + 11]
+ writer, puffin_path = _new_writer(tmp_path)
+ writer.set_blob(positions=positions, referenced_data_file="file.parquet")
+ writer.finish()
+ puffin_bytes = puffin_path.read_bytes()
+
+ footer_payload_size = int.from_bytes(puffin_bytes[-12:-8], "little")
+ footer_bytes = puffin_bytes[-(footer_payload_size + 12) : -12]
+ footer = json.loads(footer_bytes)
+ blob = footer["blobs"][0]
+ blob_bytes = puffin_bytes[blob["offset"] : blob["offset"] + blob["length"]]
+
+ length_prefix = int.from_bytes(blob_bytes[0:4], "big")
+ magic_and_vector = blob_bytes[4 : 4 + length_prefix]
+ stored_crc = int.from_bytes(blob_bytes[4 + length_prefix : 8 + length_prefix], "big")
+
+ assert magic_and_vector[: len(DELETION_VECTOR_MAGIC)] == DELETION_VECTOR_MAGIC
+ assert length_prefix == len(DELETION_VECTOR_MAGIC) + len(magic_and_vector[len(DELETION_VECTOR_MAGIC) :])
+ assert blob["length"] == 4 + length_prefix + 4
+ assert stored_crc == zlib.crc32(magic_and_vector)
+
+
+def test_dv_vector_body_is_portable_croaring(tmp_path: Path) -> None:
+ positions = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, (1 << 32) + 100, (1 << 32) + 101]
+ writer, puffin_path = _new_writer(tmp_path)
+ writer.set_blob(positions=positions, referenced_data_file="file.parquet")
+ writer.finish()
+ puffin_bytes = puffin_path.read_bytes()
+
+ footer_payload_size = int.from_bytes(puffin_bytes[-12:-8], "little")
+ footer_bytes = puffin_bytes[-(footer_payload_size + 12) : -12]
+ footer = json.loads(footer_bytes)
+ blob = footer["blobs"][0]
+ blob_bytes = puffin_bytes[blob["offset"] : blob["offset"] + blob["length"]]
+
+ length_prefix = int.from_bytes(blob_bytes[0:4], "big")
+ assert blob_bytes[4:8] == DELETION_VECTOR_MAGIC
+ vector_body = blob_bytes[8 : 4 + length_prefix]
+
+ cursor = 0
+ count = int.from_bytes(vector_body[cursor : cursor + 8], "little")
+ cursor += 8
+ assert count == 2
+
+ bitmaps_by_key: dict[int, BitMap] = {}
+ keys = []
+ for index in range(count):
+ key = int.from_bytes(vector_body[cursor : cursor + 4], "little")
+ keys.append(key)
+ cursor += 4
+
+ bitmap_bytes = vector_body[cursor:]
+ if index == 0:
+ assert bitmap_bytes[:2] in (b"\x3a\x30", b"\x3b\x30")
+
+ bitmap = BitMap.deserialize(bitmap_bytes)
+ bitmaps_by_key[key] = bitmap
+ cursor += len(bitmap.serialize())
+
+ assert keys == [0, 1]
+ assert keys == sorted(keys)
+ assert set(bitmaps_by_key[0]) == set(range(10))
+ assert set(bitmaps_by_key[1]) == {100, 101}
+ assert cursor == len(vector_body)
+
+
+def test_dv_duplicate_positions_deduped(tmp_path: Path) -> None:
+ positions = [(1 << 32) + 3, 0, (1 << 32) + 3, 7, 0, (1 << 32), 7, 3]
+ expected = sorted(set(positions))
+ writer, puffin_path = _new_writer(tmp_path)
+ writer.set_blob(positions=positions, referenced_data_file="file.parquet")
+ writer.finish()
+
+ reader = PuffinFile(puffin_path.read_bytes())
+ blob = reader.footer.blobs[0]
+ assert blob.properties["cardinality"] == str(len(expected))
+ assert reader.to_vector()["file.parquet"].to_pylist() == expected
+
+
+def test_write_and_read_puffin_file(tmp_path: Path) -> None:
+ writer, puffin_path = _new_writer(tmp_path)
+ writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet")
+ writer.set_blob(positions=[4, 5, 6], referenced_data_file="file2.parquet")
+ writer.finish()
+
+ reader = PuffinFile(puffin_path.read_bytes())
+
+ assert len(reader.footer.blobs) == 1
+ blob = reader.footer.blobs[0]
+
+ assert blob.properties["referenced-data-file"] == "file2.parquet"
+ assert blob.properties["cardinality"] == "3"
+ assert blob.type == "deletion-vector-v1"
+ # Reserved field id of the row position column (Java MetadataColumns.ROW_POSITION, INT_MAX - 2);
+ # required for Java/Spark interoperability.
+ assert blob.fields == [2147483645]
+ assert blob.snapshot_id == -1
+ assert blob.sequence_number == -1
+ assert blob.compression_codec is None
+
+ vectors = reader.to_vector()
+ assert len(vectors) == 1
+ assert "file1.parquet" not in vectors
+ assert vectors["file2.parquet"].to_pylist() == [4, 5, 6]
+
+
+def test_deletion_vector_blob_framing_is_spec_compliant(tmp_path: Path) -> None:
+ # PuffinFile reads only the serialized vector, skipping the blob's length prefix,
+ # deletion-vector magic and CRC-32. Assert that framing directly at the byte level so
+ # the bytes an external reader (Java/Spark) relies on stay spec-compliant.
+ positions = [0, 1, 5, (1 << 32) + 7]
+ writer, puffin_path = _new_writer(tmp_path)
+ writer.set_blob(positions=positions, referenced_data_file="file.parquet")
+ writer.finish()
+ puffin_bytes = puffin_path.read_bytes()
+
+ # The Puffin file begins with the magic.
+ assert puffin_bytes[:4] == MAGIC_BYTES
+
+ blob = PuffinFile(puffin_bytes).footer.blobs[0]
+ blob_bytes = puffin_bytes[blob.offset : blob.offset + blob.length]
+
+ # Layout: length (4B big-endian) | DV magic (4B) | vector | CRC-32 (4B big-endian),
+ # where the length and CRC-32 both cover the magic bytes plus the vector.
+ length_prefix = int.from_bytes(blob_bytes[0:4], "big")
+ dv_magic = blob_bytes[4:8]
+ vector = blob_bytes[8 : 4 + length_prefix]
+ crc = int.from_bytes(blob_bytes[4 + length_prefix : 8 + length_prefix], "big")
+
+ assert dv_magic == DELETION_VECTOR_MAGIC
+ assert length_prefix == len(dv_magic) + len(vector)
+ assert blob.length == 4 + length_prefix + 4
+ assert crc == zlib.crc32(dv_magic + vector)
+
+
+def test_puffin_file_with_no_blobs(tmp_path: Path) -> None:
+ writer, puffin_path = _new_writer(tmp_path)
+ writer.finish()
+
+ reader = PuffinFile(puffin_path.read_bytes())
+ assert len(reader.footer.blobs) == 0
+ assert len(reader.to_vector()) == 0
+
+
+def test_puffin_writer_default_created_by(tmp_path: Path) -> None:
+ writer, puffin_path = _new_writer(tmp_path)
+ writer.finish()
+
+ reader = PuffinFile(puffin_path.read_bytes())
+ assert reader.footer.properties["created-by"] == f"PyIceberg version {__version__}"
+
+
+def test_set_blob_rejects_negative_positions(tmp_path: Path) -> None:
+ writer, _ = _new_writer(tmp_path)
+ with pytest.raises(ValueError, match="Invalid position: -1"):
+ writer.set_blob(positions=[1, -1], referenced_data_file="file.parquet")
+
+
+def test_set_blob_rejects_empty_positions(tmp_path: Path) -> None:
+ writer, _ = _new_writer(tmp_path)
+ with pytest.raises(ValueError, match="Deletion vector must contain at least one position"):
+ writer.set_blob(positions=[], referenced_data_file="file.parquet")
+
+
+def test_set_blob_rejects_position_exceeding_java_key_range(tmp_path: Path) -> None:
+ writer, _ = _new_writer(tmp_path)
+ with pytest.raises(ValueError, match="Key 2147483648 is too large, max 2147483647"):
+ writer.set_blob(positions=[(2**31) << 32], referenced_data_file="file.parquet")
diff --git a/tests/table/test_v3_deletion_vector_write.py b/tests/table/test_v3_deletion_vector_write.py
new file mode 100644
index 0000000000..d4e0aab62c
--- /dev/null
+++ b/tests/table/test_v3_deletion_vector_write.py
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""End-to-end tests for committing a v3 deletion vector and reading it back.
+
+These exercise the full integration: write a v3 table, write a Puffin deletion
+vector for one of its data files, emit a DELETES manifest entry carrying the
+``referenced_data_file``/``content_offset``/``content_size_in_bytes`` v3 fields,
+commit it through the snapshot producer, then scan the table and confirm the
+existing reader applies the deletion vector.
+"""
+
+import pyarrow as pa
+import pytest
+
+from pyiceberg.catalog.sql import SqlCatalog
+from pyiceberg.manifest import DataFileContent, FileFormat
+from pyiceberg.table.puffin import write_deletion_vector
+
+
+@pytest.fixture()
+def catalog(tmp_path): # type: ignore[no-untyped-def]
+ catalog = SqlCatalog(
+ "test",
+ uri=f"sqlite:///{tmp_path}/cat.db",
+ warehouse=f"file://{tmp_path}",
+ )
+ catalog.create_namespace("ns")
+ return catalog
+
+
+def _commit_dv(catalog, table_identifier, positions): # type: ignore[no-untyped-def]
+ table = catalog.load_table(table_identifier)
+ tasks = list(table.scan().plan_files())
+ assert len(tasks) == 1
+ data_file = tasks[0].file
+
+ dv_path = data_file.file_path.replace(".parquet", "-dv.puffin")
+ dv_data_file = write_deletion_vector(
+ output_file=table.io.new_output(dv_path),
+ referenced_data_file=data_file.file_path,
+ positions=positions,
+ partition=data_file.partition,
+ spec_id=data_file.spec_id,
+ )
+ with table.transaction() as txn:
+ with txn.update_snapshot().append_deletion_vectors() as dv:
+ dv.append_data_file(dv_data_file)
+ return data_file, dv_data_file
+
+
+def test_commit_and_read_deletion_vector(catalog) -> None: # type: ignore[no-untyped-def]
+ schema = pa.schema([("id", pa.int64())])
+ table = catalog.create_table("ns.dv", schema=schema, properties={"format-version": "3"})
+ table.append(pa.table({"id": list(range(10))}))
+
+ data_file, dv_data_file = _commit_dv(catalog, "ns.dv", [2, 4, 6])
+
+ # The DV manifest entry models a position-delete file in Puffin format.
+ assert dv_data_file.content == DataFileContent.POSITION_DELETES
+ assert dv_data_file.file_format == FileFormat.PUFFIN
+ assert dv_data_file.referenced_data_file == data_file.file_path
+ # content_offset brackets the framed deletion-vector blob (length+magic+vector+crc)
+ # measured from the start of the Puffin file; size covers the whole framed blob.
+ assert dv_data_file.content_offset == 4
+ assert dv_data_file.content_size_in_bytes > 0
+
+ # The existing reader applies the deletion vector.
+ table = catalog.load_table("ns.dv")
+ ids = sorted(table.scan().to_arrow().column("id").to_pylist())
+ assert ids == [0, 1, 3, 5, 7, 8, 9]
+
+
+def test_deletion_vector_manifest_entry_persisted_on_disk(catalog) -> None: # type: ignore[no-untyped-def]
+ schema = pa.schema([("id", pa.int64())])
+ table = catalog.create_table("ns.dv2", schema=schema, properties={"format-version": "3"})
+ table.append(pa.table({"id": list(range(8))}))
+
+ data_file, _ = _commit_dv(catalog, "ns.dv2", [0, 7])
+
+ # Read the committed manifest entry straight off disk and assert the v3 DV fields are there.
+ table = catalog.load_table("ns.dv2")
+ snapshot = table.current_snapshot()
+ assert snapshot is not None
+
+ delete_entries = []
+ delete_manifests = []
+ for manifest in snapshot.manifests(table.io):
+ for entry in manifest.fetch_manifest_entry(table.io):
+ if entry.data_file.content == DataFileContent.POSITION_DELETES:
+ delete_entries.append(entry.data_file)
+ delete_manifests.append(manifest)
+
+ assert len(delete_entries) == 1
+ dv = delete_entries[0]
+ assert dv.file_format == FileFormat.PUFFIN
+ assert dv.referenced_data_file == data_file.file_path
+ assert dv.content_offset is not None
+ assert dv.content_size_in_bytes is not None
+
+ # The manifest itself is a DELETES manifest (content == 1), not a DATA manifest.
+ from pyiceberg.manifest import ManifestContent
+
+ assert delete_manifests[0].content == ManifestContent.DELETES
+
+ ids = sorted(table.scan().to_arrow().column("id").to_pylist())
+ assert ids == [1, 2, 3, 4, 5, 6]
+
+
+def test_deletion_vector_snapshot_summary_accounts_for_deletes(catalog) -> None: # type: ignore[no-untyped-def]
+ schema = pa.schema([("id", pa.int64())])
+ table = catalog.create_table("ns.dv_summary", schema=schema, properties={"format-version": "3"})
+ table.append(pa.table({"id": list(range(10))}))
+
+ _commit_dv(catalog, "ns.dv_summary", [2, 4, 6])
+
+ table = catalog.load_table("ns.dv_summary")
+ summary = table.current_snapshot().summary
+ assert summary.operation.value == "delete"
+ # The DELETE snapshot must record the committed deletion vector in both the
+ # per-snapshot (added-*) and table-level (total-*) accounting; otherwise engines
+ # and table.inspect undercount delete files forever.
+ assert summary["added-delete-files"] == "1"
+ assert summary["added-position-delete-files"] == "1"
+ assert summary["added-position-deletes"] == "3"
+ assert summary["total-delete-files"] == "1"
+ assert summary["total-position-deletes"] == "3"
+
+
+def test_deletion_vectors_require_v3(catalog) -> None: # type: ignore[no-untyped-def]
+ schema = pa.schema([("id", pa.int64())])
+ table = catalog.create_table("ns.v2", schema=schema, properties={"format-version": "2"})
+ table.append(pa.table({"id": [1, 2, 3]}))
+
+ with pytest.raises(ValueError, match="format version 3"):
+ with table.transaction() as txn:
+ txn.update_snapshot().append_deletion_vectors()
diff --git a/tests/table/test_v3_encryption_metadata_write.py b/tests/table/test_v3_encryption_metadata_write.py
new file mode 100644
index 0000000000..185b332ce4
--- /dev/null
+++ b/tests/table/test_v3_encryption_metadata_write.py
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""End-to-end test that v3 ``encryption-keys`` metadata round-trips through a real metadata file.
+
+The v3 write path is now enabled (the write gate was lifted in the foundation),
+so ``TableMetadataV3`` serializes through the production ``ToOutputFile``/
+``FromInputFile`` code path. This proves the ``encryption-keys`` field and the
+snapshot ``key-id`` field survive an actual on-disk metadata file written and
+read back with the catalog's own serializers.
+"""
+
+import json
+
+import pyarrow as pa
+import pytest
+
+from pyiceberg.catalog.sql import SqlCatalog
+from pyiceberg.io.pyarrow import PyArrowFileIO
+from pyiceberg.serializers import FromInputFile, ToOutputFile
+from pyiceberg.table.encryption import EncryptedKey
+
+
+@pytest.fixture()
+def catalog(tmp_path): # type: ignore[no-untyped-def]
+ catalog = SqlCatalog(
+ "test",
+ uri=f"sqlite:///{tmp_path}/cat.db",
+ warehouse=f"file://{tmp_path}",
+ )
+ catalog.create_namespace("ns")
+ return catalog
+
+
+def test_encryption_keys_round_trip_through_real_v3_metadata_file(catalog, tmp_path) -> None: # type: ignore[no-untyped-def]
+ schema = pa.schema([("id", pa.int64())])
+ table = catalog.create_table("ns.enc", schema=schema, properties={"format-version": "3"})
+ table.append(pa.table({"id": [1, 2, 3]}))
+
+ metadata = table.metadata
+ assert metadata.format_version == 3
+
+ enriched = metadata.model_copy(
+ update={
+ "encryption_keys": [
+ EncryptedKey(
+ key_id="key-1",
+ encrypted_key_metadata="ZW5jcnlwdGVkLW1ldGE=",
+ encrypted_by_id="kms-root",
+ properties={"scheme": "AES-GCM"},
+ ),
+ EncryptedKey(key_id="key-2", encrypted_key_metadata="c2Vjb25k"),
+ ]
+ }
+ )
+
+ io = PyArrowFileIO()
+ metadata_path = f"file://{tmp_path}/v3-with-encryption.metadata.json"
+ ToOutputFile.table_metadata(enriched, io.new_output(metadata_path), overwrite=True)
+
+ # The raw bytes on disk really contain the v3 encryption-keys structure with spec aliases.
+ raw = io.new_input(metadata_path).open().read().decode("utf-8")
+ payload = json.loads(raw)
+ assert "encryption-keys" in payload
+ assert payload["encryption-keys"][0]["key-id"] == "key-1"
+ assert payload["encryption-keys"][0]["encrypted-key-metadata"] == "ZW5jcnlwdGVkLW1ldGE="
+ assert payload["encryption-keys"][0]["encrypted-by-id"] == "kms-root"
+ assert payload["encryption-keys"][0]["properties"] == {"scheme": "AES-GCM"}
+ assert payload["encryption-keys"][1]["key-id"] == "key-2"
+ # encrypted-by-id / properties are optional and omitted when unset
+ assert "encrypted-by-id" not in payload["encryption-keys"][1]
+
+ # Read the file back through the production deserializer.
+ parsed = FromInputFile.table_metadata(io.new_input(metadata_path))
+ assert parsed.format_version == 3
+ assert parsed.encryption_keys is not None
+ assert len(parsed.encryption_keys) == 2
+ assert parsed.encryption_keys[0] == EncryptedKey(
+ key_id="key-1",
+ encrypted_key_metadata="ZW5jcnlwdGVkLW1ldGE=",
+ encrypted_by_id="kms-root",
+ properties={"scheme": "AES-GCM"},
+ )
+ assert parsed.encryption_keys[1].key_id == "key-2"
+ assert parsed.encryption_keys[1].encrypted_by_id is None
+
+
+def test_snapshot_key_id_round_trip_through_real_v3_metadata_file(catalog, tmp_path) -> None: # type: ignore[no-untyped-def]
+ schema = pa.schema([("id", pa.int64())])
+ table = catalog.create_table("ns.enc_snap", schema=schema, properties={"format-version": "3"})
+ table.append(pa.table({"id": [1, 2, 3]}))
+
+ metadata = table.metadata
+ snapshot = metadata.snapshots[0]
+ keyed_snapshot = snapshot.model_copy(update={"key_id": "snap-key-7"})
+ enriched = metadata.model_copy(update={"snapshots": [keyed_snapshot]})
+
+ io = PyArrowFileIO()
+ metadata_path = f"file://{tmp_path}/v3-snap-keyid.metadata.json"
+ ToOutputFile.table_metadata(enriched, io.new_output(metadata_path), overwrite=True)
+
+ raw = json.loads(io.new_input(metadata_path).open().read().decode("utf-8"))
+ assert raw["snapshots"][0]["key-id"] == "snap-key-7"
+
+ parsed = FromInputFile.table_metadata(io.new_input(metadata_path))
+ assert parsed.snapshots[0].key_id == "snap-key-7"
diff --git a/tests/table/test_v3_geo_bounds_write.py b/tests/table/test_v3_geo_bounds_write.py
new file mode 100644
index 0000000000..9908ff0b6e
--- /dev/null
+++ b/tests/table/test_v3_geo_bounds_write.py
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Tests for the integrated v3 geospatial bounds write helper + bbox pruning.
+
+``write_file`` now runs ``_geospatial_bounds_from_arrow`` over geometry/geography
+WKB columns and merges the resulting ``lower_bounds``/``upper_bounds`` into the
+data file. These tests exercise that integrated helper against real Arrow tables,
+confirm the bounds serialize per spec, and confirm ``bbox_might_match`` prunes
+and keeps files correctly against the persisted bounds (never a false negative).
+
+Documented gap (see integration.md): a full ``Table.append`` of a geo column is
+blocked on geoarrow-pyarrow <-> Iceberg schema/parquet interop (the WKB extension
+type is neither inferred by ``visit_pyarrow`` nor written cleanly by the Parquet
+writer). The bounds-computation and pruning code integrated here is fully real.
+"""
+
+import struct
+
+import pyarrow as pa
+
+from pyiceberg.io.pyarrow import _geospatial_bounds_from_arrow
+from pyiceberg.schema import Schema
+from pyiceberg.types import GeographyType, GeometryType, LongType, NestedField
+from pyiceberg.utils.geospatial import deserialize_geospatial_bound
+from pyiceberg.utils.geospatial_pruning import bbox_might_match
+
+
+def _wkb_point(x: float, y: float) -> bytes:
+ return struct.pack(" None:
+ schema = Schema(
+ NestedField(1, "id", LongType(), required=False),
+ NestedField(2, "geom", GeometryType(), required=False),
+ )
+ arrow = pa.table(
+ {
+ "id": pa.array([1, 2, 3], pa.int64()),
+ "geom": pa.array(
+ [_wkb_point(0.0, 0.0), _wkb_point(10.0, 20.0), _wkb_point(-5.0, 3.0)],
+ pa.large_binary(),
+ ),
+ }
+ )
+
+ lower, upper = _geospatial_bounds_from_arrow(schema, arrow)
+
+ assert set(lower) == {2}
+ assert set(upper) == {2}
+
+ lower_bound = deserialize_geospatial_bound(lower[2])
+ upper_bound = deserialize_geospatial_bound(upper[2])
+ assert (lower_bound.x, lower_bound.y) == (-5.0, 0.0)
+ assert (upper_bound.x, upper_bound.y) == (10.0, 20.0)
+
+ # Pruning against the exact serialized bounds.
+ assert bbox_might_match("st-intersects", _wkb_point(100.0, 100.0), lower[2], upper[2], is_geography=False) is False
+ assert bbox_might_match("st-intersects", _wkb_point(5.0, 5.0), lower[2], upper[2], is_geography=False) is True
+ # The file's own corner must never be a false negative.
+ assert bbox_might_match("st-intersects", _wkb_point(-5.0, 0.0), lower[2], upper[2], is_geography=False) is True
+ assert bbox_might_match("st-intersects", _wkb_point(10.0, 20.0), lower[2], upper[2], is_geography=False) is True
+
+
+def test_geospatial_bounds_skips_nulls_and_nongeo() -> None:
+ schema = Schema(
+ NestedField(1, "id", LongType(), required=False),
+ NestedField(2, "geom", GeometryType(), required=False),
+ )
+ arrow = pa.table(
+ {
+ "id": pa.array([1, 2, 3], pa.int64()),
+ "geom": pa.array([_wkb_point(1.0, 2.0), None, _wkb_point(3.0, 4.0)], pa.large_binary()),
+ }
+ )
+
+ lower, upper = _geospatial_bounds_from_arrow(schema, arrow)
+ # Non-geo column (id) gets no geo bounds.
+ assert set(lower) == {2}
+ lower_bound = deserialize_geospatial_bound(lower[2])
+ upper_bound = deserialize_geospatial_bound(upper[2])
+ assert (lower_bound.x, lower_bound.y) == (1.0, 2.0)
+ assert (upper_bound.x, upper_bound.y) == (3.0, 4.0)
+
+
+def test_geospatial_bounds_empty_when_all_null() -> None:
+ schema = Schema(NestedField(2, "geom", GeometryType(), required=False))
+ arrow = pa.table({"geom": pa.array([None, None], pa.large_binary())})
+
+ lower, upper = _geospatial_bounds_from_arrow(schema, arrow)
+ assert lower == {}
+ assert upper == {}
+
+
+def test_geospatial_bounds_geography_is_flagged() -> None:
+ schema = Schema(NestedField(5, "geo", GeographyType(), required=False))
+ arrow = pa.table({"geo": pa.array([_wkb_point(-170.0, 10.0), _wkb_point(170.0, -10.0)], pa.large_binary())})
+
+ lower, upper = _geospatial_bounds_from_arrow(schema, arrow)
+ assert set(lower) == {5}
+ # geography longitude bounds use the antimeridian-minimal interval; a query
+ # crossing the seam must still be kept against this wrapped bbox.
+ assert bbox_might_match("st-intersects", _wkb_point(180.0, 0.0), lower[5], upper[5], is_geography=True) is True
diff --git a/tests/table/test_v3_row_lineage.py b/tests/table/test_v3_row_lineage.py
new file mode 100644
index 0000000000..8eded20b46
--- /dev/null
+++ b/tests/table/test_v3_row_lineage.py
@@ -0,0 +1,410 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Acceptance tests for v3 write gate + row-lineage assignment.
+
+T1-foundation: exercises the real local write path end to end, asserting that
+v3 row lineage (next-row-id / first-row-id / added-rows) is correct and
+monotonic across multiple commits, that next-row-id never silently falls back
+to None, and that the full v3 metadata round-trips through JSON.
+"""
+
+import json
+from pathlib import Path
+from typing import cast
+
+import pyarrow as pa
+import pytest
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.catalog.memory import InMemoryCatalog
+from pyiceberg.manifest import ManifestContent
+from pyiceberg.schema import Schema
+from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV3
+from pyiceberg.types import IntegerType, NestedField, StringType
+
+
+@pytest.fixture
+def v3_catalog(tmp_path: Path) -> Catalog:
+ return InMemoryCatalog("t1", warehouse=f"file://{tmp_path}")
+
+
+SCHEMA = Schema(
+ NestedField(field_id=1, name="id", field_type=IntegerType(), required=False),
+ NestedField(field_id=2, name="name", field_type=StringType(), required=False),
+)
+
+ARROW_SCHEMA = pa.schema(
+ [
+ pa.field("id", pa.int32(), nullable=True),
+ pa.field("name", pa.string(), nullable=True),
+ ]
+)
+
+
+def _batch(ids: list[int]) -> pa.Table:
+ return pa.Table.from_pylist(
+ [{"id": i, "name": f"row-{i}"} for i in ids],
+ schema=ARROW_SCHEMA,
+ )
+
+
+def test_v3_table_creation_starts_next_row_id_at_zero(v3_catalog: Catalog) -> None:
+ v3_catalog.create_namespace("ns")
+ tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"})
+ assert tbl.metadata.format_version == 3
+ assert tbl.metadata.next_row_id == 0
+
+
+def test_v3_append_twice_row_lineage_is_monotonic(v3_catalog: Catalog) -> None:
+ v3_catalog.create_namespace("ns")
+ tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"})
+
+ assert tbl.metadata.next_row_id == 0
+ assert tbl.metadata.next_row_id is not None
+
+ # First append: 3 rows.
+ tbl.append(_batch([1, 2, 3]))
+ tbl = v3_catalog.load_table("ns.t")
+
+ snap1 = tbl.metadata.current_snapshot()
+ assert snap1 is not None
+ assert snap1.first_row_id == 0, "first snapshot must start assigning at row id 0"
+ assert snap1.added_rows == 3, "added_rows must reflect the 3 rows written, not None"
+ assert tbl.metadata.next_row_id == 3, "next_row_id must advance by added rows"
+ assert tbl.metadata.next_row_id is not None, "next_row_id must NEVER fall back to None on v3"
+
+ # Second append: 2 rows.
+ tbl.append(_batch([4, 5]))
+ tbl = v3_catalog.load_table("ns.t")
+
+ snaps = sorted(tbl.metadata.snapshots, key=lambda s: s.sequence_number or 0)
+ snap2 = snaps[-1]
+ assert snap2.first_row_id == 3, "second snapshot must start where the first left off"
+ assert snap2.added_rows == 2
+ assert tbl.metadata.next_row_id == 5, "next_row_id must be strictly monotonically increasing"
+
+ # Monotonicity across all snapshots: each first_row_id + added_rows chains.
+ running = 0
+ for s in snaps:
+ assert s.first_row_id is not None
+ assert s.added_rows is not None
+ assert s.first_row_id == running, f"gap/overlap in row-id assignment at snapshot {s.snapshot_id}"
+ running += s.added_rows
+ assert running == tbl.metadata.next_row_id
+
+
+def test_v3_merge_append_does_not_double_count_existing_rows(v3_catalog: Catalog, monkeypatch: pytest.MonkeyPatch) -> None:
+ """Merge-append must actually MERGE manifests and must NOT double-count existing rows.
+
+ This is the #3070 double-count regression guard. We instrument the merge manager so the
+ test fails if no manifest merge ever happens (the historical bug: v3 merging was silently
+ disabled by a descending-vs-ascending ordering check, so this fix was dead code).
+ """
+ from pyiceberg.table.update import snapshot as snapshot_module
+
+ merge_calls = {"count": 0}
+ original_create = snapshot_module._ManifestMergeManager._create_manifest
+
+ def _counting_create(self, spec_id, manifest_bin): # type: ignore[no-untyped-def]
+ merge_calls["count"] += 1
+ return original_create(self, spec_id, manifest_bin)
+
+ monkeypatch.setattr(snapshot_module._ManifestMergeManager, "_create_manifest", _counting_create)
+
+ v3_catalog.create_namespace("ns")
+ tbl = v3_catalog.create_table(
+ "ns.t",
+ schema=SCHEMA,
+ properties={
+ "format-version": "3",
+ "commit.manifest-merge.enabled": "true",
+ "commit.manifest.min-count-to-merge": "1",
+ },
+ )
+
+ for ids in ([1, 2, 3], [4, 5], [6, 7, 8, 9]):
+ tbl.append(_batch(ids))
+ tbl = v3_catalog.load_table("ns.t")
+
+ # The merge path MUST have run at least once; otherwise the double-count fix is dead code.
+ assert merge_calls["count"] > 0, "v3 manifest merge never ran — merging is silently disabled"
+
+ # The data manifests must have been compacted (3 appends -> fewer than 3 DATA manifests).
+ snap = tbl.metadata.current_snapshot()
+ assert snap is not None
+ data_manifests = [m for m in snap.manifests(tbl.io) if m.content == ManifestContent.DATA]
+ assert len(data_manifests) < 3, "manifests were not actually merged"
+
+ snaps = sorted(tbl.metadata.snapshots, key=lambda s: s.sequence_number or 0)
+ added_rows = [s.added_rows for s in snaps]
+
+ # next_row_id == 9 proves no double-counting (9 real rows, not 17).
+ assert tbl.metadata.next_row_id == 9
+ assert added_rows == [3, 2, 4]
+ assert all(rows is not None for rows in added_rows)
+ assert sum(rows for rows in added_rows if rows is not None) == tbl.metadata.next_row_id
+
+ # The merged manifests must tile [0, 9) exactly, with each data file's row range coherent.
+ assigned = sorted(
+ ((m.first_row_id, cast(int, m.existing_rows_count) + cast(int, m.added_rows_count)) for m in data_manifests),
+ key=lambda pair: cast(int, pair[0]),
+ )
+ cursor = 0
+ for first_row_id, rows in assigned:
+ assert first_row_id == cursor, "merged manifest row-id ranges have a gap/overlap"
+ cursor += rows
+ assert cursor == 9
+
+ # The data is still fully readable and correct after merging.
+ actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist())
+ assert actual_ids == [1, 2, 3, 4, 5, 6, 7, 8, 9]
+
+
+def test_v3_manifest_carries_first_row_id(v3_catalog: Catalog) -> None:
+ """The data manifest in the manifest list must be assigned a first_row_id per spec."""
+ v3_catalog.create_namespace("ns")
+ tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"})
+ tbl.append(_batch([1, 2, 3]))
+ tbl = v3_catalog.load_table("ns.t")
+
+ snap = tbl.metadata.current_snapshot()
+ assert snap is not None
+ manifests = snap.manifests(tbl.io)
+ data_manifests = [m for m in manifests if m.content == ManifestContent.DATA]
+ assert len(data_manifests) >= 1
+ # The first (only) data manifest must carry the snapshot's first_row_id (0).
+ assert data_manifests[0].first_row_id == 0
+
+
+def test_v3_metadata_round_trips_through_json(v3_catalog: Catalog) -> None:
+ v3_catalog.create_namespace("ns")
+ tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"})
+ tbl.append(_batch([1, 2, 3]))
+ tbl.append(_batch([4, 5]))
+ tbl = v3_catalog.load_table("ns.t")
+
+ meta = tbl.metadata
+ assert isinstance(meta, TableMetadataV3)
+
+ dumped = meta.model_dump_json()
+ reparsed = TableMetadataUtil.parse_raw(dumped)
+ assert isinstance(reparsed, TableMetadataV3)
+ assert reparsed.next_row_id == meta.next_row_id == 5
+ # Round-trip equality of the model.
+ assert reparsed == meta
+ # The serialized JSON must include next-row-id.
+ assert json.loads(dumped)["next-row-id"] == 5
+
+
+def _data_file_row_ids(tbl: object) -> list[tuple[int | None, int | None, int]]:
+ """Return (manifest.first_row_id, data_file.first_row_id (field 142), record_count) for DATA entries."""
+ snap = tbl.metadata.current_snapshot() # type: ignore[attr-defined]
+ out: list[tuple[int | None, int | None, int]] = []
+ for m in snap.manifests(tbl.io): # type: ignore[attr-defined]
+ if m.content != ManifestContent.DATA:
+ continue
+ for entry in m.fetch_manifest_entry(tbl.io, discard_deleted=True): # type: ignore[attr-defined]
+ out.append((m.first_row_id, entry.data_file.first_row_id, entry.data_file.record_count))
+ return out
+
+
+def test_v3_whole_file_delete_does_not_renumber_surviving_rows(v3_catalog: Catalog) -> None:
+ """Copy-on-write whole-file delete must NEVER re-number the surviving rows.
+
+ Two separate data files are written (row ids [0,1,2] and [3,4,5]). Deleting the whole
+ first file (predicate aligned to a full file) must:
+ - leave next_row_id unchanged (0 new rows assigned), and
+ - keep the surviving file's row-id lineage intact (manifest first_row_id == 3, and the
+ materialized data-file field-142 first_row_id == 3) — NOT renumbered to 0.
+ This asserts on the _row_id (field 142 / manifest first_row_id), not the user `id` column.
+ """
+ v3_catalog.create_namespace("ns")
+ tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"})
+
+ # Two separate files: ids 0,1,2 (row ids 0-2) then ids 10,11,12 (row ids 3-5).
+ tbl.append(_batch([0, 1, 2]))
+ tbl = v3_catalog.load_table("ns.t")
+ tbl.append(_batch([10, 11, 12]))
+ tbl = v3_catalog.load_table("ns.t")
+ assert tbl.metadata.next_row_id == 6
+
+ before = sorted(_data_file_row_ids(tbl), key=lambda triple: triple[0] or 0)
+ # manifest first_row_ids should be 0 and 3.
+ assert [triple[0] for triple in before] == [0, 3]
+
+ # Delete the entire first file (all ids < 5). This is a metadata-only (whole-file) delete.
+ tbl.delete(delete_filter="id < 5")
+ tbl = v3_catalog.load_table("ns.t")
+
+ # next_row_id must NOT advance — zero rows were newly assigned.
+ assert tbl.metadata.next_row_id == 6, "whole-file delete must not assign new row ids"
+ delete_snap = tbl.metadata.current_snapshot()
+ assert delete_snap is not None
+ assert delete_snap.added_rows == 0, "a delete must report 0 added rows, not re-numbered survivors"
+
+ # The surviving file must KEEP its original row-id lineage (manifest first_row_id == 3).
+ surviving = _data_file_row_ids(tbl)
+ assert len(surviving) == 1
+ surviving_manifest_frid, surviving_datafile_frid, surviving_rows = surviving[0]
+ assert surviving_rows == 3
+ assert surviving_manifest_frid == 3, "surviving rows must keep their original first_row_id (3), not be renumbered"
+
+ # Data correctness.
+ actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist())
+ assert actual_ids == [10, 11, 12]
+
+
+def test_v3_whole_file_delete_with_two_survivors_renumbers_none(v3_catalog: Catalog) -> None:
+ """Three files; delete the middle file wholesale; the two survivors keep their row ids."""
+ v3_catalog.create_namespace("ns")
+ tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"})
+ tbl.append(_batch([0, 1])) # row ids 0-1
+ tbl = v3_catalog.load_table("ns.t")
+ tbl.append(_batch([10, 11])) # row ids 2-3 (deleted)
+ tbl = v3_catalog.load_table("ns.t")
+ tbl.append(_batch([20, 21])) # row ids 4-5
+ tbl = v3_catalog.load_table("ns.t")
+ assert tbl.metadata.next_row_id == 6
+
+ # delete the middle file (ids 10,11) wholesale using a bound-provable range predicate
+ # (min=10,max=11 fully inside [10,20)), so this is a metadata-only whole-file delete.
+ tbl.delete(delete_filter="id >= 10 and id < 20")
+ tbl = v3_catalog.load_table("ns.t")
+
+ assert tbl.metadata.next_row_id == 6, "no new row ids on a whole-file delete"
+ frids = sorted(cast(int, triple[0]) for triple in _data_file_row_ids(tbl))
+ # survivors must keep first_row_ids 0 and 4 (NOT renumbered to 0 and 2).
+ assert frids == [0, 4], "survivors were re-numbered after deleting the middle file"
+ actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist())
+ assert actual_ids == [0, 1, 20, 21]
+
+
+def test_v3_whole_file_delete_in_shared_manifest_preserves_survivor_row_ids(v3_catalog: Catalog) -> None:
+ """When two data files share ONE manifest and one is deleted wholesale, the survivor must
+ keep its row-id lineage.
+
+ This is the strongest delete-lineage guard: the surviving file is REWRITTEN into a new
+ manifest (the shared source manifest is dropped), so without preserving lineage the
+ manifest-list writer renumbers the survivor (the historical bug: next_row_id jumped 6->9,
+ survivor block frid 0->6). The fix materializes the survivor's absolute _row_id into
+ DataFile field 142 and inherits the source manifest's first_row_id.
+ """
+ import itertools
+ import uuid
+
+ from pyiceberg.io.pyarrow import _dataframe_to_data_files
+
+ v3_catalog.create_namespace("ns")
+ tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"})
+
+ # Two data files in a SINGLE manifest (one fast-append commit): row ids 0-2 and 3-5.
+ with tbl.transaction() as txn:
+ with txn.update_snapshot().fast_append() as fast_append:
+ for ids in ([0, 1, 2], [100, 101, 102]):
+ for data_file in _dataframe_to_data_files(
+ io=tbl.io,
+ df=_batch(ids),
+ table_metadata=txn.table_metadata,
+ write_uuid=uuid.uuid4(),
+ counter=itertools.count(),
+ ):
+ fast_append.append_data_file(data_file)
+ tbl = v3_catalog.load_table("ns.t")
+ assert tbl.metadata.next_row_id == 6
+ # one shared manifest with first_row_id == 0
+ assert [m[0] for m in _data_file_row_ids(tbl)] == [0, 0]
+
+ # Whole-file delete the first file (ids 0,1,2): bound-provable (max=2 < 5).
+ tbl.delete(delete_filter="id < 5")
+ tbl = v3_catalog.load_table("ns.t")
+
+ # next_row_id must NOT advance and no new rows are added.
+ assert tbl.metadata.next_row_id == 6, "survivor was re-numbered (next_row_id advanced)"
+ delete_snap = tbl.metadata.current_snapshot()
+ assert delete_snap is not None
+ assert delete_snap.added_rows == 0
+
+ surviving = _data_file_row_ids(tbl)
+ assert len(surviving) == 1
+ manifest_frid, datafile_frid, rows = surviving[0]
+ assert rows == 3
+ # The rewritten manifest inherits the source manifest's first_row_id (0)...
+ assert manifest_frid == 0, "rewritten manifest must inherit the source manifest first_row_id"
+ # ...and the survivor's absolute _row_id (3) is materialized into field 142.
+ assert datafile_frid == 3, "survivor's _row_id (field 142) must be materialized to its original value (3)"
+
+ assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [100, 101, 102]
+
+
+def test_v3_partial_rewrite_delete_fails_loudly(v3_catalog: Catalog) -> None:
+ """A copy-on-write delete that needs to REWRITE a data file must fail loudly on v3.
+
+ Preserving _row_id lineage across a physical rewrite needs materialized per-row _row_id
+ columns, which PyIceberg does not have. Rather than silently re-numbering survivors, the
+ v3 path raises NotImplementedError.
+ """
+ v3_catalog.create_namespace("ns")
+ tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"})
+ # Single file containing 0..5; deleting a subset forces a partial rewrite.
+ tbl.append(_batch([0, 1, 2, 3, 4, 5]))
+ tbl = v3_catalog.load_table("ns.t")
+
+ with pytest.raises(NotImplementedError, match="copy-on-write delete"):
+ tbl.delete(delete_filter="id in (2, 3)")
+
+ # The table state is unchanged (no corruption, no renumbering).
+ tbl = v3_catalog.load_table("ns.t")
+ assert tbl.metadata.next_row_id == 6
+ assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [0, 1, 2, 3, 4, 5]
+
+
+def test_v2_partial_rewrite_delete_still_works(tmp_path: Path) -> None:
+ """The loud v3 failure must NOT regress v2 copy-on-write deletes."""
+ cat = InMemoryCatalog("t1v2", warehouse=f"file://{tmp_path}")
+ cat.create_namespace("ns")
+ tbl = cat.create_table("ns.t", schema=SCHEMA, properties={"format-version": "2"})
+ tbl.append(_batch([0, 1, 2, 3, 4, 5]))
+ tbl = cat.load_table("ns.t")
+ tbl.delete(delete_filter="id in (2, 3)")
+ tbl = cat.load_table("ns.t")
+ assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [0, 1, 4, 5]
+
+
+def test_v3_upgrade_from_v2_seeds_next_row_id(tmp_path: Path) -> None:
+ """v2 -> v3 upgrade must be reachable via the public API and seed next_row_id = 0."""
+ cat = InMemoryCatalog("t1up", warehouse=f"file://{tmp_path}")
+ cat.create_namespace("ns")
+ tbl = cat.create_table("ns.t", schema=SCHEMA, properties={"format-version": "2"})
+ assert tbl.metadata.format_version == 2
+ # v2 metadata does not carry next-row-id at all.
+ assert getattr(tbl.metadata, "next_row_id", None) is None
+
+ with tbl.transaction() as txn:
+ txn.upgrade_table_version(3)
+ tbl = cat.load_table("ns.t")
+
+ assert tbl.metadata.format_version == 3
+ assert tbl.metadata.next_row_id == 0, "upgrade to v3 must seed next_row_id at 0"
+
+ # And the upgraded table must support a v3 append with correct row lineage.
+ tbl.append(_batch([1, 2, 3]))
+ tbl = cat.load_table("ns.t")
+ snap = tbl.metadata.current_snapshot()
+ assert snap is not None
+ assert snap.first_row_id == 0
+ assert snap.added_rows == 3
+ assert tbl.metadata.next_row_id == 3
diff --git a/tests/utils/test_geospatial.py b/tests/utils/test_geospatial.py
new file mode 100644
index 0000000000..3ac3364c64
--- /dev/null
+++ b/tests/utils/test_geospatial.py
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import math
+import struct
+
+import pytest
+
+from pyiceberg.utils.geospatial import (
+ GeospatialBound,
+ deserialize_geospatial_bound,
+ extract_envelope_from_wkb,
+ merge_envelopes,
+ serialize_geospatial_bound,
+)
+
+
+def test_geospatial_bound_serde_xy() -> None:
+ raw = serialize_geospatial_bound(GeospatialBound(x=10.0, y=20.0))
+ assert len(raw) == 16
+ bound = deserialize_geospatial_bound(raw)
+ assert bound.x == 10.0
+ assert bound.y == 20.0
+ assert bound.z is None
+ assert bound.m is None
+
+
+def test_geospatial_bound_serde_xyz() -> None:
+ raw = serialize_geospatial_bound(GeospatialBound(x=10.0, y=20.0, z=30.0))
+ assert len(raw) == 24
+ bound = deserialize_geospatial_bound(raw)
+ assert bound.x == 10.0
+ assert bound.y == 20.0
+ assert bound.z == 30.0
+ assert bound.m is None
+
+
+def test_geospatial_bound_serde_xym() -> None:
+ raw = serialize_geospatial_bound(GeospatialBound(x=10.0, y=20.0, m=40.0))
+ assert len(raw) == 32
+ x, y, z, m = struct.unpack(" None:
+ raw = serialize_geospatial_bound(GeospatialBound(x=10.0, y=20.0, z=30.0, m=40.0))
+ assert len(raw) == 32
+ bound = deserialize_geospatial_bound(raw)
+ assert bound.x == 10.0
+ assert bound.y == 20.0
+ assert bound.z == 30.0
+ assert bound.m == 40.0
+
+
+def test_geospatial_bound_serde_rejects_ambiguous_nan_z_with_m() -> None:
+ with pytest.raises(ValueError, match="NaN z"):
+ serialize_geospatial_bound(GeospatialBound(x=1.0, y=2.0, z=math.nan, m=5.0))
+
+ xym = GeospatialBound(x=1.0, y=2.0, z=None, m=5.0)
+ assert deserialize_geospatial_bound(serialize_geospatial_bound(xym)) == xym
+
+ xyzm = GeospatialBound(x=1.0, y=2.0, z=3.0, m=5.0)
+ assert deserialize_geospatial_bound(serialize_geospatial_bound(xyzm)) == xyzm
+
+
+def test_extract_envelope_geometry() -> None:
+ # LINESTRING(170 0, -170 1)
+ wkb = struct.pack(" None:
+ # LINESTRING(170 0, -170 1)
+ wkb = struct.pack(" envelope.x_max
+ assert envelope.x_min == 170.0
+ assert envelope.x_max == -170.0
+ assert envelope.y_min == 0.0
+ assert envelope.y_max == 1.0
+
+
+def test_extract_envelope_xyzm_linestring() -> None:
+ # LINESTRING ZM (0 1 2 3, 4 5 6 7)
+ wkb = struct.pack(" None:
+ left = extract_envelope_from_wkb(struct.pack(" merged.x_max
+ assert merged.x_min == 170.0
+ assert merged.x_max == -120.0
+ assert merged.y_min == 0.0
+ assert merged.y_max == 3.0
diff --git a/tests/utils/test_geospatial_pruning.py b/tests/utils/test_geospatial_pruning.py
new file mode 100644
index 0000000000..5e633fe45c
--- /dev/null
+++ b/tests/utils/test_geospatial_pruning.py
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import struct
+from random import Random
+
+import pytest
+
+from pyiceberg.utils.geospatial import (
+ GeospatialBound,
+ GeospatialStatsAggregator,
+ extract_envelope_from_wkb,
+ serialize_geospatial_bound,
+)
+from pyiceberg.utils.geospatial_pruning import bbox_might_match
+
+_SUPPORTED_PREDICATES = ("st-contains", "st-intersects", "st-overlaps", "st-within")
+
+
+def _point_wkb(x: float, y: float) -> bytes:
+ return struct.pack(" bytes:
+ return struct.pack(">BIdd", 0, 1, x, y)
+
+
+def _point_xyzm_wkb(x: float, y: float, z: float, m: float) -> bytes:
+ return struct.pack(" bytes:
+ ordinates = [ordinate for point in points for ordinate in point]
+ return struct.pack(" bytes:
+ points = [
+ (x_min, y_min),
+ (x_max, y_min),
+ (x_max, y_max),
+ (x_min, y_max),
+ (x_min, y_min),
+ ]
+ ordinates = [ordinate for point in points for ordinate in point]
+ return struct.pack(" bytes:
+ return serialize_geospatial_bound(GeospatialBound(x=x, y=y, z=z, m=m))
+
+
+def _bounds(x_min: float, y_min: float, x_max: float, y_max: float) -> tuple[bytes, bytes]:
+ return _bound(x_min, y_min), _bound(x_max, y_max)
+
+
+def _stats_bounds(points: list[tuple[float, float]], is_geography: bool) -> tuple[bytes, bytes]:
+ aggregator = GeospatialStatsAggregator(is_geography=is_geography)
+ for x, y in points:
+ aggregator.add(_point_wkb(x, y))
+
+ lower_bound = aggregator.serialized_min()
+ upper_bound = aggregator.serialized_max()
+ assert lower_bound is not None
+ assert upper_bound is not None
+ return lower_bound, upper_bound
+
+
+@pytest.mark.parametrize("predicate", ["st-intersects", "st-overlaps", "st-contains"])
+def test_bbox_pruning_disjoint_geometry_returns_false(predicate: str) -> None:
+ lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0)
+ query_wkb = _polygon_bbox_wkb(20.0, 20.0, 30.0, 30.0)
+
+ assert not bbox_might_match(predicate, query_wkb, lower_bound, upper_bound, is_geography=False)
+
+
+def test_bbox_pruning_overlapping_geometry_returns_true() -> None:
+ lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0)
+ query_wkb = _polygon_bbox_wkb(5.0, 5.0, 15.0, 15.0)
+
+ assert bbox_might_match("st-intersects", query_wkb, lower_bound, upper_bound, is_geography=False)
+
+
+@pytest.mark.parametrize(("lower_bound", "upper_bound"), [(None, _bound(10.0, 10.0)), (_bound(0.0, 0.0), None)])
+def test_bbox_pruning_missing_bounds_returns_true(lower_bound: bytes | None, upper_bound: bytes | None) -> None:
+ assert bbox_might_match("st-intersects", _point_wkb(100.0, 100.0), lower_bound, upper_bound, is_geography=False)
+
+
+def test_bbox_pruning_within_disjoint_returns_false() -> None:
+ lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0)
+ query_wkb = _polygon_bbox_wkb(20.0, 20.0, 30.0, 30.0)
+
+ assert not bbox_might_match("st-within", query_wkb, lower_bound, upper_bound, is_geography=False)
+
+
+def test_bbox_pruning_within_overlapping_returns_true() -> None:
+ lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0)
+ query_wkb = _polygon_bbox_wkb(5.0, 5.0, 15.0, 15.0)
+
+ assert bbox_might_match("st-within", query_wkb, lower_bound, upper_bound, is_geography=False)
+
+
+def test_bbox_pruning_antimeridian_wrapped_file_matches_query_inside_interval() -> None:
+ lower_bound, upper_bound = _bounds(170.0, -10.0, -170.0, 10.0)
+
+ assert bbox_might_match("st-intersects", _point_wkb(178.0, 0.0), lower_bound, upper_bound, is_geography=True)
+
+
+def test_bbox_pruning_antimeridian_wrapped_file_prunes_query_outside_interval() -> None:
+ lower_bound, upper_bound = _bounds(170.0, -10.0, -170.0, 10.0)
+
+ assert not bbox_might_match("st-intersects", _point_wkb(0.0, 0.0), lower_bound, upper_bound, is_geography=True)
+
+
+@pytest.mark.parametrize(
+ ("file_x_min", "file_x_max", "query_x"),
+ [
+ (170.0, 180.0, 180.0),
+ (170.0, 180.0, -180.0),
+ (180.0, 180.0, 180.0),
+ (180.0, 180.0, -180.0),
+ (-180.0, -180.0, 180.0),
+ (-180.0, -180.0, -180.0),
+ ],
+)
+def test_bbox_pruning_geography_antimeridian_seam_is_closed(
+ file_x_min: float,
+ file_x_max: float,
+ query_x: float,
+) -> None:
+ lower_bound, upper_bound = _bounds(file_x_min, 0.0, file_x_max, 10.0)
+
+ assert bbox_might_match("st-intersects", _point_wkb(query_x, 5.0), lower_bound, upper_bound, is_geography=True)
+
+
+@pytest.mark.parametrize(
+ ("file_x_min", "file_y_min", "file_x_max", "file_y_max", "query_x", "query_y"),
+ [
+ (170.0, 0.0, 180.0, 10.0, 0.0, 5.0),
+ (170.0, 0.0, 180.0, 10.0, -90.0, 5.0),
+ (180.0, 0.0, 180.0, 10.0, 179.0, 5.0),
+ (-180.0, 0.0, -180.0, 10.0, -179.0, 5.0),
+ (170.0, 0.0, -170.0, 10.0, 0.0, 5.0),
+ ],
+)
+def test_bbox_pruning_geography_negative_mutation_guard(
+ file_x_min: float,
+ file_y_min: float,
+ file_x_max: float,
+ file_y_max: float,
+ query_x: float,
+ query_y: float,
+) -> None:
+ # Mutation guard: keep at least five False assertions so a return-True stub fails loudly.
+ lower_bound, upper_bound = _bounds(file_x_min, file_y_min, file_x_max, file_y_max)
+
+ assert not bbox_might_match("st-intersects", _point_wkb(query_x, query_y), lower_bound, upper_bound, is_geography=True)
+
+
+def test_bbox_pruning_big_endian_point_wkb_extracts_and_prunes() -> None:
+ query_wkb = _big_endian_point_wkb(5.0, 6.0)
+ envelope = extract_envelope_from_wkb(query_wkb, is_geography=False)
+
+ assert envelope is not None
+ assert envelope.x_min == 5.0
+ assert envelope.x_max == 5.0
+ assert envelope.y_min == 6.0
+ assert envelope.y_max == 6.0
+
+ matching_lower_bound, matching_upper_bound = _bounds(0.0, 0.0, 10.0, 10.0)
+ disjoint_lower_bound, disjoint_upper_bound = _bounds(20.0, 0.0, 30.0, 10.0)
+
+ assert bbox_might_match(
+ "st-intersects",
+ query_wkb,
+ matching_lower_bound,
+ matching_upper_bound,
+ is_geography=False,
+ )
+ assert not bbox_might_match(
+ "st-intersects",
+ query_wkb,
+ disjoint_lower_bound,
+ disjoint_upper_bound,
+ is_geography=False,
+ )
+
+
+def test_bbox_pruning_false_negative_guard_for_points_inside_wrapped_file_bbox() -> None:
+ lower_bound, upper_bound = _bounds(170.0, -20.0, -160.0, 20.0)
+
+ for longitude, latitude in [
+ (170.0, -20.0),
+ (174.5, 3.0),
+ (179.0, 19.0),
+ (180.0, 0.0),
+ (-179.5, -7.0),
+ (-170.0, 20.0),
+ (-160.0, 0.5),
+ ]:
+ assert bbox_might_match(
+ "st-intersects",
+ _point_wkb(longitude, latitude),
+ lower_bound,
+ upper_bound,
+ is_geography=True,
+ )
+
+
+def test_bbox_pruning_ignores_z_and_m_dimensions() -> None:
+ lower_bound = _bound(0.0, 0.0, z=0.0, m=0.0)
+ upper_bound = _bound(10.0, 10.0, z=1.0, m=1.0)
+ query_wkb = _point_xyzm_wkb(5.0, 5.0, 999.0, 999.0)
+
+ assert bbox_might_match("st-intersects", query_wkb, lower_bound, upper_bound, is_geography=False)
+
+
+def test_bbox_pruning_empty_query_returns_true() -> None:
+ lower_bound, upper_bound = _bounds(0.0, 0.0, 10.0, 10.0)
+ query_wkb = _linestring_wkb([])
+
+ assert bbox_might_match("st-intersects", query_wkb, lower_bound, upper_bound, is_geography=False)
+
+
+def test_bbox_pruning_geography_preserves_file_edge_point_after_circle_round_trip() -> None:
+ points = [(-9.32, 29.55), (-158.16, 36.27), (52.97, 88.76)]
+ lower_bound, upper_bound = _stats_bounds(points, is_geography=True)
+
+ assert bbox_might_match(
+ "st-intersects",
+ _point_wkb(-158.16, 36.27),
+ lower_bound,
+ upper_bound,
+ is_geography=True,
+ )
+
+
+@pytest.mark.parametrize(
+ ("is_geography", "lon_range", "lat_range"),
+ [
+ (True, (-180.0, 180.0), (-90.0, 90.0)),
+ (False, (-1_000_000.0, 1_000_000.0), (-1_000_000.0, 1_000_000.0)),
+ ],
+)
+def test_bbox_pruning_never_prunes_files_own_points(
+ is_geography: bool,
+ lon_range: tuple[float, float],
+ lat_range: tuple[float, float],
+) -> None:
+ rng = Random(1729 if is_geography else 2718)
+ false_negatives: list[tuple[list[tuple[float, float]], tuple[float, float], str]] = []
+
+ for _ in range(2000):
+ points = [
+ (
+ rng.uniform(*lon_range),
+ rng.uniform(*lat_range),
+ )
+ for _ in range(rng.randint(1, 5))
+ ]
+ lower_bound, upper_bound = _stats_bounds(points, is_geography=is_geography)
+
+ for point in points:
+ query_wkb = _point_wkb(*point)
+ for predicate in _SUPPORTED_PREDICATES:
+ if not bbox_might_match(
+ predicate,
+ query_wkb,
+ lower_bound,
+ upper_bound,
+ is_geography=is_geography,
+ ):
+ false_negatives.append((points, point, predicate))
+
+ assert false_negatives == []
+
+
+def test_bbox_pruning_geography_still_prunes_clearly_disjoint_query() -> None:
+ lower_bound, upper_bound = _stats_bounds(
+ [(-1.0, -1.0), (0.0, 0.0), (1.0, 1.0)],
+ is_geography=True,
+ )
+
+ assert not bbox_might_match("st-intersects", _point_wkb(170.0, 80.0), lower_bound, upper_bound, is_geography=True)
diff --git a/tests/utils/test_geospatial_stats.py b/tests/utils/test_geospatial_stats.py
new file mode 100644
index 0000000000..a8d375e36a
--- /dev/null
+++ b/tests/utils/test_geospatial_stats.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import struct
+
+from pyiceberg.utils.geospatial import GeospatialStatsAggregator, deserialize_geospatial_bound
+
+
+def _point_wkb(x: float, y: float) -> bytes:
+ return struct.pack(" bytes:
+ ordinates = [ordinate for point in points for ordinate in point]
+ return struct.pack(" None:
+ aggregator = GeospatialStatsAggregator(is_geography=False)
+
+ aggregator.add(_point_wkb(10.0, -3.0))
+ aggregator.add(_linestring_xyzm_wkb([(-2.0, 4.0, 7.0, 3.0), (6.0, 9.0, 2.0, 8.0)]))
+
+ min_bound = aggregator.min_bound()
+ max_bound = aggregator.max_bound()
+ assert min_bound is not None
+ assert max_bound is not None
+ assert min_bound.x == -2.0
+ assert min_bound.y == -3.0
+ assert min_bound.z == 2.0
+ assert min_bound.m == 3.0
+ assert max_bound.x == 10.0
+ assert max_bound.y == 9.0
+ assert max_bound.z == 7.0
+ assert max_bound.m == 8.0
+
+ serialized_min = aggregator.serialized_min()
+ serialized_max = aggregator.serialized_max()
+ assert serialized_min is not None
+ assert serialized_max is not None
+ assert deserialize_geospatial_bound(serialized_min) == aggregator.min_bound()
+ assert deserialize_geospatial_bound(serialized_max) == aggregator.max_bound()
+
+
+def test_geospatial_stats_aggregator_geography_wraps_antimeridian() -> None:
+ aggregator = GeospatialStatsAggregator(is_geography=True)
+
+ aggregator.add(_point_wkb(170.0, 1.0))
+ aggregator.add(_point_wkb(-170.0, 2.0))
+
+ min_bound = aggregator.min_bound()
+ max_bound = aggregator.max_bound()
+
+ assert min_bound is not None
+ assert max_bound is not None
+ assert min_bound.x > max_bound.x
+ assert min_bound.x == 170.0
+ assert max_bound.x == -170.0
+ assert min_bound.y == 1.0
+ assert max_bound.y == 2.0
+
+
+def test_geospatial_stats_aggregator_empty_input_has_no_bounds() -> None:
+ aggregator = GeospatialStatsAggregator(is_geography=False)
+
+ assert aggregator.min_bound() is None
+ assert aggregator.max_bound() is None
+ assert aggregator.serialized_min() is None
+ assert aggregator.serialized_max() is None