Skip to content
Open
55 changes: 54 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2613,6 +2613,49 @@ def data_file_statistics_from_parquet_metadata(
)


def _geospatial_bounds_from_arrow(schema: Schema, arrow_table: pa.Table) -> tuple[dict[int, bytes], dict[int, bytes]]:
"""Compute v3 geospatial lower/upper bounds for any geometry/geography columns.

Geometry/geography values are stored as WKB. Parquet column statistics cannot
bound WKB blobs spatially, so the bounds are derived by extracting each value's
envelope. Returns ``(lower_bounds, upper_bounds)`` keyed by field id, serialized
per the spec (little-endian doubles). Columns with no extractable geometry are
skipped.
"""
from pyiceberg.types import GeographyType, GeometryType
from pyiceberg.utils.geospatial import GeospatialStatsAggregator

lower: dict[int, bytes] = {}
upper: dict[int, bytes] = {}

for field in schema.as_struct().fields:
if not isinstance(field.field_type, (GeometryType, GeographyType)):
continue
if field.name not in arrow_table.column_names:
continue

is_geography = isinstance(field.field_type, GeographyType)
aggregator = GeospatialStatsAggregator(is_geography=is_geography)
column = arrow_table.column(field.name)
# Geometry/geography columns are WKB. When geoarrow-pyarrow is present the
# column is a WKB extension type whose storage holds the raw WKB bytes; unwrap
# it so the envelope extractor sees bytes rather than parsed geometry objects.
for chunk in column.chunks:
storage = chunk.storage if isinstance(chunk.type, pa.ExtensionType) else chunk
for value in storage.to_pylist():
if value is None:
continue
aggregator.add(bytes(value))

serialized_min = aggregator.serialized_min()
serialized_max = aggregator.serialized_max()
if serialized_min is not None and serialized_max is not None:
lower[field.field_id] = serialized_min
upper[field.field_id] = serialized_max

return lower, upper


def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties

Expand Down Expand Up @@ -2660,6 +2703,16 @@ def write_parquet(task: WriteTask) -> DataFile:
stats_columns=compute_statistics_plan(file_schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
)
serialized_stats = statistics.to_serialized_dict()

# v3 geospatial bounds: parquet column stats cannot bound WKB spatially, so
# derive lower/upper bounds from the geometry/geography column envelopes and
# merge them into the data file bounds.
geo_lower, geo_upper = _geospatial_bounds_from_arrow(file_schema, arrow_table)
if geo_lower:
serialized_stats["lower_bounds"] = {**serialized_stats.get("lower_bounds", {}), **geo_lower}
serialized_stats["upper_bounds"] = {**serialized_stats.get("upper_bounds", {}), **geo_upper}

data_file = DataFile.from_args(
content=DataFileContent.DATA,
file_path=file_path,
Expand All @@ -2674,7 +2727,7 @@ def write_parquet(task: WriteTask) -> DataFile:
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict(),
**serialized_stats,
)

return data_file
Expand Down
170 changes: 160 additions & 10 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
DEFAULT_READ_VERSION: Literal[2] = 2
DEFAULT_READ_VERSION: Literal[3] = 3

INITIAL_SEQUENCE_NUMBER = 0

Expand Down Expand Up @@ -531,6 +531,46 @@ def equality_ids(self) -> list[int] | None:
def sort_order_id(self) -> int | None:
return self._data[15]

@property
def first_row_id(self) -> int | None:
"""The _row_id for the first row in the data file (field 142, v3+ only).

Note: ``DataFile.from_args`` always allocates the full v3-width record
(``len == 20``) regardless of table version, so a v1/v2 file simply holds
``None`` at position 16. The length guard below only matters for records
constructed directly from a shorter positional list.
"""
if len(self._data) > 16:
return self._data[16]
return None

@first_row_id.setter
def first_row_id(self, value: int | None) -> None:
if len(self._data) <= 16:
raise ValueError("Cannot set first_row_id: record has no field-142 slot")
self._data[16] = value

@property
def referenced_data_file(self) -> str | None:
"""Location of the data file that all deletes in this file reference (field 143, v3+)."""
if len(self._data) > 17:
return self._data[17]
return None

@property
def content_offset(self) -> int | None:
"""Offset in the file where the deletion-vector blob starts (field 144, v3+)."""
if len(self._data) > 18:
return self._data[18]
return None

@property
def content_size_in_bytes(self) -> int | None:
"""Length of the referenced deletion-vector blob (field 145, v3+)."""
if len(self._data) > 19:
return self._data[19]
return None

# Spec ID should not be stored in the file
_spec_id: int

Expand All @@ -549,6 +589,23 @@ def __setattr__(self, name: str, value: Any) -> None:
value = FileFormat[value]
super().__setattr__(name, value)

def __copy__(self) -> DataFile:
"""Return a copy whose ``_data`` list is independent from the original.

``Record`` stores its fields in a mutable ``_data`` list, so the default
``copy.copy`` would share that list and a mutation on the copy (e.g.
assigning ``first_row_id`` while materializing row lineage on a delete)
would leak back into the source DataFile. Copying the list isolates
field-level mutations to the copy while preserving ``_spec_id``.
"""
cls = self.__class__
new = cls.__new__(cls)
new._data = list(self._data)
instance_dict = getattr(self, "__dict__", None)
if instance_dict:
new.__dict__.update(instance_dict)
return new

def __hash__(self) -> int:
"""Return the hash of the file path."""
return hash(self.file_path)
Expand Down Expand Up @@ -852,6 +909,17 @@ def partitions(self) -> list[PartitionFieldSummary] | None:
def key_metadata(self) -> bytes | None:
return self._data[14]

@property
def first_row_id(self) -> int | None:
return self._data[15] if len(self._data) > 15 else None

@first_row_id.setter
def first_row_id(self, value: int | None) -> None:
if len(self._data) <= 15:
self._data.append(value)
else:
self._data[15] = value

def has_added_files(self) -> bool:
return self.added_files_count is None or self.added_files_count > 0

Expand Down Expand Up @@ -1014,6 +1082,7 @@ class ManifestWriter(ABC):
_min_sequence_number: int | None
_partitions: list[Record]
_compression: AvroCompressionCodec
_content: ManifestContent

def __init__(
self,
Expand All @@ -1022,12 +1091,14 @@ def __init__(
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
content: ManifestContent = ManifestContent.DATA,
) -> None:
self.closed = False
self._spec = spec
self._schema = schema
self._output_file = output_file
self._snapshot_id = snapshot_id
self._content = content

self._added_files = 0
self._added_rows = 0
Expand Down Expand Up @@ -1192,8 +1263,11 @@ def __init__(
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
content: ManifestContent = ManifestContent.DATA,
):
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)
if content != ManifestContent.DATA:
raise ValueError("Cannot write delete manifests in a v1 table")
super().__init__(spec, schema, output_file, snapshot_id, avro_compression, content)

def content(self) -> ManifestContent:
return ManifestContent.DATA
Expand All @@ -1214,11 +1288,12 @@ def __init__(
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
content: ManifestContent = ManifestContent.DATA,
):
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)
super().__init__(spec, schema, output_file, snapshot_id, avro_compression, content)

def content(self) -> ManifestContent:
return ManifestContent.DATA
return self._content

@property
def version(self) -> TableVersion:
Expand All @@ -1228,7 +1303,7 @@ def version(self) -> TableVersion:
def _meta(self) -> dict[str, str]:
return {
**super()._meta,
"content": "data",
"content": "deletes" if self._content == ManifestContent.DELETES else "data",
}

def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
Expand All @@ -1240,18 +1315,27 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
return entry


class ManifestWriterV3(ManifestWriterV2):
@property
def version(self) -> TableVersion:
return 3


def write_manifest(
format_version: TableVersion,
spec: PartitionSpec,
schema: Schema,
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
content: ManifestContent = ManifestContent.DATA,
) -> ManifestWriter:
if format_version == 1:
return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression, content)
elif format_version == 2:
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression, content)
elif format_version == 3:
return ManifestWriterV3(spec, schema, output_file, snapshot_id, avro_compression, content)
else:
raise ValueError(f"Cannot write manifest for table version: {format_version}")

Expand Down Expand Up @@ -1295,6 +1379,10 @@ def __exit__(
@abstractmethod
def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ...

@property
def next_row_id(self) -> int | None:
return None

def add_manifests(self, manifest_files: list[ManifestFile]) -> ManifestListWriter:
self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files])
return self
Expand Down Expand Up @@ -1351,9 +1439,7 @@ def __init__(
self._commit_snapshot_id = snapshot_id
self._sequence_number = sequence_number

def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
wrapped_manifest_file = copy(manifest_file)

def _prepare_manifest_for_commit(self, wrapped_manifest_file: ManifestFile) -> ManifestFile:
if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ:
# if the sequence number is being assigned here, then the manifest must be created by the current operation.
# To validate this, check that the snapshot id matches the current commit
Expand All @@ -1374,6 +1460,56 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
wrapped_manifest_file.min_sequence_number = self._sequence_number
return wrapped_manifest_file

def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
return self._prepare_manifest_for_commit(copy(manifest_file))


class ManifestListWriterV3(ManifestListWriterV2):
_next_row_id: int

def __init__(
self,
output_file: OutputFile,
snapshot_id: int,
parent_snapshot_id: int | None,
sequence_number: int,
snapshot_first_row_id: int,
compression: AvroCompressionCodec,
):
super().__init__(
output_file=output_file,
snapshot_id=snapshot_id,
parent_snapshot_id=parent_snapshot_id,
sequence_number=sequence_number,
compression=compression,
)
self._format_version = 3
self._meta = {
"snapshot-id": str(snapshot_id),
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
"sequence-number": str(sequence_number),
"first-row-id": str(snapshot_first_row_id),
"format-version": "3",
AVRO_CODEC_KEY: compression,
}
self._next_row_id = snapshot_first_row_id

@property
def next_row_id(self) -> int | None:
return self._next_row_id

def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
wrapped = self._prepare_manifest_for_commit(copy(manifest_file))
if wrapped.content == ManifestContent.DATA and wrapped.first_row_id is None:
if wrapped.existing_rows_count is None or wrapped.added_rows_count is None:
raise ValueError(
"Cannot assign first row id for a v3 manifest without existing-rows-count and added-rows-count: "
+ wrapped.manifest_path
)
wrapped.first_row_id = self._next_row_id
self._next_row_id += wrapped.existing_rows_count + wrapped.added_rows_count
return wrapped


def write_manifest_list(
format_version: TableVersion,
Expand All @@ -1382,12 +1518,26 @@ def write_manifest_list(
parent_snapshot_id: int | None,
sequence_number: int | None,
avro_compression: AvroCompressionCodec,
snapshot_first_row_id: int | None = None,
) -> ManifestListWriter:
if format_version == 1:
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression)
elif format_version == 2:
if sequence_number is None:
raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}")
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression)
elif format_version == 3:
if sequence_number is None:
raise ValueError(f"Sequence-number is required for V3 tables: {sequence_number}")
if snapshot_first_row_id is None:
raise ValueError(f"Snapshot-first-row-id is required for V3 tables: {snapshot_first_row_id}")
return ManifestListWriterV3(
output_file=output_file,
snapshot_id=snapshot_id,
parent_snapshot_id=parent_snapshot_id,
sequence_number=sequence_number,
snapshot_first_row_id=snapshot_first_row_id,
compression=avro_compression,
)
else:
raise ValueError(f"Cannot write manifest list for table version: {format_version}")
Loading