Skip to content
164 changes: 158 additions & 6 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
DEFAULT_READ_VERSION: Literal[2] = 2
DEFAULT_READ_VERSION: Literal[3] = 3

INITIAL_SEQUENCE_NUMBER = 0

Expand Down Expand Up @@ -531,6 +531,25 @@ def equality_ids(self) -> list[int] | None:
def sort_order_id(self) -> int | None:
return self._data[15]

@property
def first_row_id(self) -> int | None:
"""The _row_id for the first row in the data file (field 142, v3+ only).

Note: ``DataFile.from_args`` always allocates the full v3-width record
(``len == 20``) regardless of table version, so a v1/v2 file simply holds
``None`` at position 16. The length guard below only matters for records
constructed directly from a shorter positional list.
"""
if len(self._data) > 16:
return self._data[16]
return None

@first_row_id.setter
def first_row_id(self, value: int | None) -> None:
if len(self._data) <= 16:
raise ValueError("Cannot set first_row_id: record has no field-142 slot")
self._data[16] = value

# Spec ID should not be stored in the file
_spec_id: int

Expand All @@ -549,6 +568,23 @@ def __setattr__(self, name: str, value: Any) -> None:
value = FileFormat[value]
super().__setattr__(name, value)

def __copy__(self) -> DataFile:
"""Return a copy whose ``_data`` list is independent from the original.

``Record`` stores its fields in a mutable ``_data`` list, so the default
``copy.copy`` would share that list and a mutation on the copy (e.g.
assigning ``first_row_id`` while materializing row lineage on a delete)
would leak back into the source DataFile. Copying the list isolates
field-level mutations to the copy while preserving ``_spec_id``.
"""
cls = self.__class__
new = cls.__new__(cls)
new._data = list(self._data)
instance_dict = getattr(self, "__dict__", None)
if instance_dict:
new.__dict__.update(instance_dict)
return new

def __hash__(self) -> int:
"""Return the hash of the file path."""
return hash(self.file_path)
Expand Down Expand Up @@ -852,6 +888,35 @@ def partitions(self) -> list[PartitionFieldSummary] | None:
def key_metadata(self) -> bytes | None:
return self._data[14]

@property
def first_row_id(self) -> int | None:
return self._data[15] if len(self._data) > 15 else None

@first_row_id.setter
def first_row_id(self, value: int | None) -> None:
if len(self._data) <= 15:
self._data.append(value)
else:
self._data[15] = value

def __copy__(self) -> ManifestFile:
"""Return a shallow copy that owns an independent data list."""
cls = self.__class__
new = cls.__new__(cls)
new._data = list(self._data)
instance_dict = getattr(self, "__dict__", None)
if instance_dict:
new.__dict__.update(instance_dict)
for klass in cls.__mro__:
slots = getattr(klass, "__slots__", ())
if isinstance(slots, str):
slots = (slots,)
for slot in slots:
if slot in {"_data", "__dict__", "__weakref__"} or not hasattr(self, slot):
continue
setattr(new, slot, getattr(self, slot))
return new

def has_added_files(self) -> bool:
return self.added_files_count is None or self.added_files_count > 0

Expand Down Expand Up @@ -931,8 +996,21 @@ def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
with _manifest_cache_lock:
for manifest_file in manifest_files:
manifest_path = manifest_file.manifest_path
if manifest_path in _manifest_cache:
result.append(_manifest_cache[manifest_path])
cached = _manifest_cache.get(manifest_path)
if cached is not None:
# first_row_id is assigned by the manifest-list writer and is therefore
# specific to the manifest list that references this manifest, not an
# intrinsic property of the manifest file. The same physical manifest can be
# referenced with first_row_id=None by an older (e.g. pre-v3-upgrade) list and
# with a concrete value by a v3 list, so the freshly read value must win;
# reusing the cached one would re-number carried-forward rows. Return a copy
# carrying this list's first_row_id rather than mutating the shared cached object.
if cached.first_row_id != manifest_file.first_row_id:
refreshed = copy(cached)
refreshed.first_row_id = manifest_file.first_row_id
result.append(refreshed)
else:
result.append(cached)
else:
_manifest_cache[manifest_path] = manifest_file
result.append(manifest_file)
Expand Down Expand Up @@ -1240,6 +1318,12 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
return entry


class ManifestWriterV3(ManifestWriterV2):
@property
def version(self) -> TableVersion:
return 3


def write_manifest(
format_version: TableVersion,
spec: PartitionSpec,
Expand All @@ -1252,6 +1336,8 @@ def write_manifest(
return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
elif format_version == 2:
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
elif format_version == 3:
return ManifestWriterV3(spec, schema, output_file, snapshot_id, avro_compression)
else:
raise ValueError(f"Cannot write manifest for table version: {format_version}")

Expand Down Expand Up @@ -1295,6 +1381,10 @@ def __exit__(
@abstractmethod
def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ...

@property
def next_row_id(self) -> int | None:
return None

def add_manifests(self, manifest_files: list[ManifestFile]) -> ManifestListWriter:
self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files])
return self
Expand Down Expand Up @@ -1351,9 +1441,7 @@ def __init__(
self._commit_snapshot_id = snapshot_id
self._sequence_number = sequence_number

def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
wrapped_manifest_file = copy(manifest_file)

def _prepare_manifest_for_commit(self, wrapped_manifest_file: ManifestFile) -> ManifestFile:
if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ:
# if the sequence number is being assigned here, then the manifest must be created by the current operation.
# To validate this, check that the snapshot id matches the current commit
Expand All @@ -1374,6 +1462,56 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
wrapped_manifest_file.min_sequence_number = self._sequence_number
return wrapped_manifest_file

def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
return self._prepare_manifest_for_commit(copy(manifest_file))


class ManifestListWriterV3(ManifestListWriterV2):
_next_row_id: int

def __init__(
self,
output_file: OutputFile,
snapshot_id: int,
parent_snapshot_id: int | None,
sequence_number: int,
snapshot_first_row_id: int,
compression: AvroCompressionCodec,
):
super().__init__(
output_file=output_file,
snapshot_id=snapshot_id,
parent_snapshot_id=parent_snapshot_id,
sequence_number=sequence_number,
compression=compression,
)
self._format_version = 3
self._meta = {
"snapshot-id": str(snapshot_id),
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
"sequence-number": str(sequence_number),
"first-row-id": str(snapshot_first_row_id),
"format-version": "3",
AVRO_CODEC_KEY: compression,
}
self._next_row_id = snapshot_first_row_id

@property
def next_row_id(self) -> int | None:
return self._next_row_id

def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
wrapped = self._prepare_manifest_for_commit(copy(manifest_file))
if wrapped.content == ManifestContent.DATA and wrapped.first_row_id is None:
if wrapped.existing_rows_count is None or wrapped.added_rows_count is None:
raise ValueError(
"Cannot assign first row id for a v3 manifest without existing-rows-count and added-rows-count: "
+ wrapped.manifest_path
)
wrapped.first_row_id = self._next_row_id
self._next_row_id += wrapped.existing_rows_count + wrapped.added_rows_count
return wrapped


def write_manifest_list(
format_version: TableVersion,
Expand All @@ -1382,12 +1520,26 @@ def write_manifest_list(
parent_snapshot_id: int | None,
sequence_number: int | None,
avro_compression: AvroCompressionCodec,
snapshot_first_row_id: int | None = None,
) -> ManifestListWriter:
if format_version == 1:
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression)
elif format_version == 2:
if sequence_number is None:
raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}")
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression)
elif format_version == 3:
if sequence_number is None:
raise ValueError(f"Sequence-number is required for V3 tables: {sequence_number}")
if snapshot_first_row_id is None:
raise ValueError(f"Snapshot-first-row-id is required for V3 tables: {snapshot_first_row_id}")
return ManifestListWriterV3(
output_file=output_file,
snapshot_id=snapshot_id,
parent_snapshot_id=parent_snapshot_id,
sequence_number=sequence_number,
snapshot_first_row_id=snapshot_first_row_id,
compression=avro_compression,
)
else:
raise ValueError(f"Cannot write manifest list for table version: {format_version}")
28 changes: 24 additions & 4 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from pyiceberg.table.inspect import InspectTable
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.table.maintenance import MaintenanceTable
from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata
from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry
Expand Down Expand Up @@ -293,7 +293,7 @@ def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
Returns:
The alter table builder.
"""
if format_version not in {1, 2}:
if not 1 <= format_version <= SUPPORTED_TABLE_FORMAT_VERSION:
raise ValueError(f"Unsupported table format version: {format_version}")

if format_version < self.table_metadata.format_version:
Expand Down Expand Up @@ -725,10 +725,30 @@ def delete(
if isinstance(delete_filter, str):
delete_filter = _parse_row_filter(delete_filter)

with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot:
delete_snapshot.delete_by_predicate(delete_filter, case_sensitive)
delete_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete()
delete_snapshot.delete_by_predicate(delete_filter, case_sensitive)

# Check if there are any files that require an actual rewrite of a data file
if delete_snapshot.rewrites_needed is True:
if self.table_metadata.format_version >= 3:
# A partial (copy-on-write) delete physically rewrites surviving rows into a
# NEW data file. Per the v3 spec those rows must KEEP their original _row_id
# (field 142 / the _row_id metadata column). After a partial delete the
# survivors are generally non-contiguous (e.g. 0,1,4,5), so their lineage can
# only be preserved by materializing an explicit per-row _row_id column on read
# and persisting it on rewrite. PyIceberg has no read-side _row_id
# materialization yet, so doing this rewrite would silently RE-NUMBER the
# surviving rows and corrupt row lineage. We fail loudly instead.
raise NotImplementedError(
"v3 copy-on-write delete that requires rewriting a data file is not "
"supported yet: surviving rows would lose their _row_id lineage because "
"PyIceberg cannot materialize/preserve per-row _row_id (field 142) across a "
"physical rewrite. Whole-file deletes (which drop entire data files and "
"preserve survivors' row ids) are supported. Track: read-side _row_id "
"materialization."
)
delete_snapshot.commit()

if delete_snapshot.rewrites_needed is True:
bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive)
preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter, self.table_metadata.schema())
Expand Down
6 changes: 2 additions & 4 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
INITIAL_SPEC_ID = 0
DEFAULT_SCHEMA_ID = 0

SUPPORTED_TABLE_FORMAT_VERSION = 2
SUPPORTED_TABLE_FORMAT_VERSION = 3


def cleanup_snapshot_id(data: dict[str, Any]) -> dict[str, Any]:
Expand Down Expand Up @@ -574,9 +574,6 @@ def construct_refs(self) -> TableMetadata:
next_row_id: int | None = Field(alias="next-row-id", default=None)
"""A long higher than all assigned row IDs; the next snapshot's `first-row-id`."""

def model_dump_json(self, exclude_none: bool = True, exclude: Any | None = None, by_alias: bool = True, **kwargs: Any) -> str:
raise NotImplementedError("Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551")


TableMetadata = Annotated[TableMetadataV1 | TableMetadataV2 | TableMetadataV3, Field(discriminator="format_version")]

Expand Down Expand Up @@ -645,6 +642,7 @@ def new_table_metadata(
properties=properties,
last_partition_id=fresh_partition_spec.last_assigned_field_id,
table_uuid=table_uuid,
next_row_id=0,
)
else:
raise ValidationError(f"Unknown format version: {format_version}")
Expand Down
Loading