diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 3811a9d894..b8e5cbf3fb 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -54,7 +54,7 @@ UNASSIGNED_SEQ = -1 DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 -DEFAULT_READ_VERSION: Literal[2] = 2 +DEFAULT_READ_VERSION: Literal[3] = 3 INITIAL_SEQUENCE_NUMBER = 0 @@ -531,6 +531,25 @@ def equality_ids(self) -> list[int] | None: def sort_order_id(self) -> int | None: return self._data[15] + @property + def first_row_id(self) -> int | None: + """The _row_id for the first row in the data file (field 142, v3+ only). + + Note: ``DataFile.from_args`` always allocates the full v3-width record + (``len == 20``) regardless of table version, so a v1/v2 file simply holds + ``None`` at position 16. The length guard below only matters for records + constructed directly from a shorter positional list. + """ + if len(self._data) > 16: + return self._data[16] + return None + + @first_row_id.setter + def first_row_id(self, value: int | None) -> None: + if len(self._data) <= 16: + raise ValueError("Cannot set first_row_id: record has no field-142 slot") + self._data[16] = value + # Spec ID should not be stored in the file _spec_id: int @@ -549,6 +568,23 @@ def __setattr__(self, name: str, value: Any) -> None: value = FileFormat[value] super().__setattr__(name, value) + def __copy__(self) -> DataFile: + """Return a copy whose ``_data`` list is independent from the original. + + ``Record`` stores its fields in a mutable ``_data`` list, so the default + ``copy.copy`` would share that list and a mutation on the copy (e.g. + assigning ``first_row_id`` while materializing row lineage on a delete) + would leak back into the source DataFile. Copying the list isolates + field-level mutations to the copy while preserving ``_spec_id``. + """ + cls = self.__class__ + new = cls.__new__(cls) + new._data = list(self._data) + instance_dict = getattr(self, "__dict__", None) + if instance_dict: + new.__dict__.update(instance_dict) + return new + def __hash__(self) -> int: """Return the hash of the file path.""" return hash(self.file_path) @@ -852,6 +888,35 @@ def partitions(self) -> list[PartitionFieldSummary] | None: def key_metadata(self) -> bytes | None: return self._data[14] + @property + def first_row_id(self) -> int | None: + return self._data[15] if len(self._data) > 15 else None + + @first_row_id.setter + def first_row_id(self, value: int | None) -> None: + if len(self._data) <= 15: + self._data.append(value) + else: + self._data[15] = value + + def __copy__(self) -> ManifestFile: + """Return a shallow copy that owns an independent data list.""" + cls = self.__class__ + new = cls.__new__(cls) + new._data = list(self._data) + instance_dict = getattr(self, "__dict__", None) + if instance_dict: + new.__dict__.update(instance_dict) + for klass in cls.__mro__: + slots = getattr(klass, "__slots__", ()) + if isinstance(slots, str): + slots = (slots,) + for slot in slots: + if slot in {"_data", "__dict__", "__weakref__"} or not hasattr(self, slot): + continue + setattr(new, slot, getattr(self, slot)) + return new + def has_added_files(self) -> bool: return self.added_files_count is None or self.added_files_count > 0 @@ -931,8 +996,21 @@ def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]: with _manifest_cache_lock: for manifest_file in manifest_files: manifest_path = manifest_file.manifest_path - if manifest_path in _manifest_cache: - result.append(_manifest_cache[manifest_path]) + cached = _manifest_cache.get(manifest_path) + if cached is not None: + # first_row_id is assigned by the manifest-list writer and is therefore + # specific to the manifest list that references this manifest, not an + # intrinsic property of the manifest file. The same physical manifest can be + # referenced with first_row_id=None by an older (e.g. pre-v3-upgrade) list and + # with a concrete value by a v3 list, so the freshly read value must win; + # reusing the cached one would re-number carried-forward rows. Return a copy + # carrying this list's first_row_id rather than mutating the shared cached object. + if cached.first_row_id != manifest_file.first_row_id: + refreshed = copy(cached) + refreshed.first_row_id = manifest_file.first_row_id + result.append(refreshed) + else: + result.append(cached) else: _manifest_cache[manifest_path] = manifest_file result.append(manifest_file) @@ -1240,6 +1318,12 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: return entry +class ManifestWriterV3(ManifestWriterV2): + @property + def version(self) -> TableVersion: + return 3 + + def write_manifest( format_version: TableVersion, spec: PartitionSpec, @@ -1252,6 +1336,8 @@ def write_manifest( return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression) elif format_version == 2: return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression) + elif format_version == 3: + return ManifestWriterV3(spec, schema, output_file, snapshot_id, avro_compression) else: raise ValueError(f"Cannot write manifest for table version: {format_version}") @@ -1295,6 +1381,10 @@ def __exit__( @abstractmethod def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ... + @property + def next_row_id(self) -> int | None: + return None + def add_manifests(self, manifest_files: list[ManifestFile]) -> ManifestListWriter: self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files]) return self @@ -1351,9 +1441,7 @@ def __init__( self._commit_snapshot_id = snapshot_id self._sequence_number = sequence_number - def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: - wrapped_manifest_file = copy(manifest_file) - + def _prepare_manifest_for_commit(self, wrapped_manifest_file: ManifestFile) -> ManifestFile: if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ: # if the sequence number is being assigned here, then the manifest must be created by the current operation. # To validate this, check that the snapshot id matches the current commit @@ -1374,6 +1462,56 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: wrapped_manifest_file.min_sequence_number = self._sequence_number return wrapped_manifest_file + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + return self._prepare_manifest_for_commit(copy(manifest_file)) + + +class ManifestListWriterV3(ManifestListWriterV2): + _next_row_id: int + + def __init__( + self, + output_file: OutputFile, + snapshot_id: int, + parent_snapshot_id: int | None, + sequence_number: int, + snapshot_first_row_id: int, + compression: AvroCompressionCodec, + ): + super().__init__( + output_file=output_file, + snapshot_id=snapshot_id, + parent_snapshot_id=parent_snapshot_id, + sequence_number=sequence_number, + compression=compression, + ) + self._format_version = 3 + self._meta = { + "snapshot-id": str(snapshot_id), + "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null", + "sequence-number": str(sequence_number), + "first-row-id": str(snapshot_first_row_id), + "format-version": "3", + AVRO_CODEC_KEY: compression, + } + self._next_row_id = snapshot_first_row_id + + @property + def next_row_id(self) -> int | None: + return self._next_row_id + + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + wrapped = self._prepare_manifest_for_commit(copy(manifest_file)) + if wrapped.content == ManifestContent.DATA and wrapped.first_row_id is None: + if wrapped.existing_rows_count is None or wrapped.added_rows_count is None: + raise ValueError( + "Cannot assign first row id for a v3 manifest without existing-rows-count and added-rows-count: " + + wrapped.manifest_path + ) + wrapped.first_row_id = self._next_row_id + self._next_row_id += wrapped.existing_rows_count + wrapped.added_rows_count + return wrapped + def write_manifest_list( format_version: TableVersion, @@ -1382,6 +1520,7 @@ def write_manifest_list( parent_snapshot_id: int | None, sequence_number: int | None, avro_compression: AvroCompressionCodec, + snapshot_first_row_id: int | None = None, ) -> ManifestListWriter: if format_version == 1: return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression) @@ -1389,5 +1528,18 @@ def write_manifest_list( if sequence_number is None: raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}") return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression) + elif format_version == 3: + if sequence_number is None: + raise ValueError(f"Sequence-number is required for V3 tables: {sequence_number}") + if snapshot_first_row_id is None: + raise ValueError(f"Snapshot-first-row-id is required for V3 tables: {snapshot_first_row_id}") + return ManifestListWriterV3( + output_file=output_file, + snapshot_id=snapshot_id, + parent_snapshot_id=parent_snapshot_id, + sequence_number=sequence_number, + snapshot_first_row_id=snapshot_first_row_id, + compression=avro_compression, + ) else: raise ValueError(f"Cannot write manifest list for table version: {format_version}") diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 597f62632f..94cc05dce1 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -48,7 +48,7 @@ from pyiceberg.table.inspect import InspectTable from pyiceberg.table.locations import LocationProvider, load_location_provider from pyiceberg.table.maintenance import MaintenanceTable -from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata +from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata from pyiceberg.table.name_mapping import NameMapping from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry @@ -293,7 +293,7 @@ def upgrade_table_version(self, format_version: TableVersion) -> Transaction: Returns: The alter table builder. """ - if format_version not in {1, 2}: + if not 1 <= format_version <= SUPPORTED_TABLE_FORMAT_VERSION: raise ValueError(f"Unsupported table format version: {format_version}") if format_version < self.table_metadata.format_version: @@ -725,10 +725,30 @@ def delete( if isinstance(delete_filter, str): delete_filter = _parse_row_filter(delete_filter) - with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot: - delete_snapshot.delete_by_predicate(delete_filter, case_sensitive) + delete_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() + delete_snapshot.delete_by_predicate(delete_filter, case_sensitive) # Check if there are any files that require an actual rewrite of a data file + if delete_snapshot.rewrites_needed is True: + if self.table_metadata.format_version >= 3: + # A partial (copy-on-write) delete physically rewrites surviving rows into a + # NEW data file. Per the v3 spec those rows must KEEP their original _row_id + # (field 142 / the _row_id metadata column). After a partial delete the + # survivors are generally non-contiguous (e.g. 0,1,4,5), so their lineage can + # only be preserved by materializing an explicit per-row _row_id column on read + # and persisting it on rewrite. PyIceberg has no read-side _row_id + # materialization yet, so doing this rewrite would silently RE-NUMBER the + # surviving rows and corrupt row lineage. We fail loudly instead. + raise NotImplementedError( + "v3 copy-on-write delete that requires rewriting a data file is not " + "supported yet: surviving rows would lose their _row_id lineage because " + "PyIceberg cannot materialize/preserve per-row _row_id (field 142) across a " + "physical rewrite. Whole-file deletes (which drop entire data files and " + "preserve survivors' row ids) are supported. Track: read-side _row_id " + "materialization." + ) + delete_snapshot.commit() + if delete_snapshot.rewrites_needed is True: bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive) preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter, self.table_metadata.schema()) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 26b6e3d3ad..329d4c3a7a 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -66,7 +66,7 @@ INITIAL_SPEC_ID = 0 DEFAULT_SCHEMA_ID = 0 -SUPPORTED_TABLE_FORMAT_VERSION = 2 +SUPPORTED_TABLE_FORMAT_VERSION = 3 def cleanup_snapshot_id(data: dict[str, Any]) -> dict[str, Any]: @@ -574,9 +574,6 @@ def construct_refs(self) -> TableMetadata: next_row_id: int | None = Field(alias="next-row-id", default=None) """A long higher than all assigned row IDs; the next snapshot's `first-row-id`.""" - def model_dump_json(self, exclude_none: bool = True, exclude: Any | None = None, by_alias: bool = True, **kwargs: Any) -> str: - raise NotImplementedError("Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551") - TableMetadata = Annotated[TableMetadataV1 | TableMetadataV2 | TableMetadataV3, Field(discriminator="format_version")] @@ -645,6 +642,7 @@ def new_table_metadata( properties=properties, last_partition_id=fresh_partition_spec.last_assigned_field_id, table_uuid=table_uuid, + next_row_id=0, ) else: raise ValidationError(f"Unknown format version: {format_version}") diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 64838b0bd6..8e01a76eb1 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -29,7 +29,7 @@ from pyiceberg.exceptions import CommitFailedException from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil +from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil, TableMetadataV3 from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, @@ -322,9 +322,16 @@ def _( return base_metadata updated_metadata = base_metadata.model_copy(update={"format_version": update.format_version}) + updated_metadata = TableMetadataUtil._construct_without_validation(updated_metadata) + if ( + isinstance(updated_metadata, TableMetadataV3) + and base_metadata.format_version < 3 + and updated_metadata.next_row_id is None + ): + updated_metadata = updated_metadata.model_copy(update={"next_row_id": 0}) context.add_update(update) - return TableMetadataUtil._construct_without_validation(updated_metadata) + return updated_metadata @_apply_table_update.register(SetPropertiesUpdate) @@ -435,7 +442,7 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe elif base_metadata.snapshot_by_id(update.snapshot.snapshot_id) is not None: raise ValueError(f"Snapshot with id {update.snapshot.snapshot_id} already exists") elif ( - base_metadata.format_version == 2 + base_metadata.format_version >= 2 and update.snapshot.sequence_number is not None and update.snapshot.sequence_number <= base_metadata.last_sequence_number and update.snapshot.parent_snapshot_id is not None @@ -446,6 +453,10 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe ) elif base_metadata.format_version >= 3 and update.snapshot.first_row_id is None: raise ValueError("Cannot add snapshot without first row id") + elif base_metadata.format_version >= 3 and update.snapshot.added_rows is None: + raise ValueError("Cannot add snapshot without added rows") + elif base_metadata.format_version >= 3 and base_metadata.next_row_id is None: + raise ValueError("Cannot add a snapshot when table next-row-id is null") elif ( base_metadata.format_version >= 3 and update.snapshot.first_row_id is not None @@ -458,18 +469,21 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe ) context.add_update(update) - return base_metadata.model_copy( - update={ - "last_updated_ms": update.snapshot.timestamp_ms, - "last_sequence_number": update.snapshot.sequence_number, - "snapshots": base_metadata.snapshots + [update.snapshot], - "next_row_id": base_metadata.next_row_id + update.snapshot.added_rows - if base_metadata.format_version >= 3 - and base_metadata.next_row_id is not None - and update.snapshot.added_rows is not None - else None, - } - ) + metadata_updates: dict[str, Any] = { + "last_updated_ms": update.snapshot.timestamp_ms, + "last_sequence_number": update.snapshot.sequence_number, + "snapshots": base_metadata.snapshots + [update.snapshot], + } + if base_metadata.format_version >= 3: + if base_metadata.next_row_id is None: + raise ValueError("Cannot add a snapshot when table next-row-id is null") + if update.snapshot.added_rows is None: + raise ValueError("Cannot add snapshot without added rows") + if update.snapshot.first_row_id is None: + raise ValueError("Cannot add snapshot without first row id") + metadata_updates["next_row_id"] = update.snapshot.first_row_id + update.snapshot.added_rows + + return base_metadata.model_copy(update=metadata_updates) @_apply_table_update.register(SetSnapshotRefUpdate) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 7931edacdd..35baee888b 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -21,6 +21,7 @@ from abc import abstractmethod from collections import defaultdict from collections.abc import Callable +from copy import copy from datetime import datetime from functools import cached_property from typing import TYPE_CHECKING, Generic @@ -264,8 +265,9 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: ) def _commit(self) -> UpdatesAndRequirements: + table_metadata = self._transaction.table_metadata new_manifests = self._manifests() - next_sequence_number = self._transaction.table_metadata.next_sequence_number() + next_sequence_number = table_metadata.next_sequence_number() summary = self._summary(self.snapshot_properties) file_name = _new_manifest_list_file_name( @@ -276,20 +278,31 @@ def _commit(self) -> UpdatesAndRequirements: location_provider = self._transaction._table.location_provider() manifest_list_file_path = location_provider.new_metadata_location(file_name) + if table_metadata.format_version >= 3: + snapshot_first_row_id = table_metadata.next_row_id + if snapshot_first_row_id is None: + raise ValueError("Cannot commit to a v3 table without next-row-id") + else: + snapshot_first_row_id = None + with write_manifest_list( - format_version=self._transaction.table_metadata.format_version, + format_version=table_metadata.format_version, output_file=self._io.new_output(manifest_list_file_path), snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, sequence_number=next_sequence_number, avro_compression=self._compression, + snapshot_first_row_id=snapshot_first_row_id, ) as writer: writer.add_manifests(new_manifests) - first_row_id: int | None = None - - if self._transaction.table_metadata.format_version >= 3: - first_row_id = self._transaction.table_metadata.next_row_id + if table_metadata.format_version >= 3: + if writer.next_row_id is None or snapshot_first_row_id is None: + raise ValueError("Cannot determine added rows for a v3 snapshot without row IDs") + added_rows = writer.next_row_id - snapshot_first_row_id + else: + added_rows = None + first_row_id = snapshot_first_row_id snapshot = Snapshot( snapshot_id=self._snapshot_id, @@ -297,8 +310,9 @@ def _commit(self) -> UpdatesAndRequirements: manifest_list=manifest_list_file_path, sequence_number=next_sequence_number, summary=summary, - schema_id=self._transaction.table_metadata.current_schema_id, + schema_id=table_metadata.current_schema_id, first_row_id=first_row_id, + added_rows=added_rows, ) add_snapshot_update = AddSnapshotUpdate(snapshot=snapshot) @@ -415,7 +429,9 @@ def _compute_deletes(self) -> tuple[list[ManifestFile], list[ManifestEntry], boo - Flag indicating that rewrites of data-files are needed. """ - def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry: + def _copy_with_new_status( + entry: ManifestEntry, status: ManifestEntryStatus, data_file: DataFile | None = None + ) -> ManifestEntry: return ManifestEntry.from_args( status=status, # When a file is replaced or deleted from the dataset, its manifest entry fields store the @@ -423,11 +439,12 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> snapshot_id=self.snapshot_id if status == ManifestEntryStatus.DELETED else entry.snapshot_id, sequence_number=entry.sequence_number, file_sequence_number=entry.file_sequence_number, - data_file=entry.data_file, + data_file=data_file if data_file is not None else entry.data_file, ) # avoid copying metadata for each evaluator table_metadata = self._transaction.table_metadata + is_v3 = table_metadata.format_version >= 3 schema = table_metadata.schema() manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) @@ -456,16 +473,59 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # It is relevant, let's check out the content deleted_entries = [] existing_entries = [] + # For v3 tables, surviving data files must KEEP the row ids they were + # already assigned. Rewriting the manifest would otherwise let the + # manifest-list writer re-number them (the source manifest's + # first_row_id is dropped). We materialize each surviving file's + # absolute _row_id into DataFile field 142 so it survives the rewrite. + # A file inherits row id = manifest.first_row_id + sum(record_count of + # all preceding data files in the manifest), unless it already carries + # an explicit field-142 value. + row_id_cursor: int | None = manifest_file.first_row_id if is_v3 else None for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): + materialized_data_file = entry.data_file + if is_v3: + explicit_first_row_id = materialized_data_file.first_row_id + if explicit_first_row_id is not None: + row_id_cursor = explicit_first_row_id + if row_id_cursor is None: + raise NotImplementedError( + "Cannot perform a v3 copy-on-write delete that preserves row " + "lineage: the source manifest is missing a first_row_id and " + f"the data file {materialized_data_file.file_path} has no " + "explicit field-142 first_row_id, so surviving rows cannot be " + "renumber-safely rewritten." + ) + if explicit_first_row_id is None: + # Persist the inherited absolute row id so it survives the rewrite. + materialized_data_file = copy(materialized_data_file) + materialized_data_file.first_row_id = row_id_cursor if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH: # Based on the metadata, it can be dropped right away - deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED)) + deleted_entries.append( + _copy_with_new_status( + entry, + ManifestEntryStatus.DELETED, + materialized_data_file if is_v3 else entry.data_file, + ) + ) self._deleted_data_files.add(entry.data_file) else: # Based on the metadata, we cannot determine if it can be deleted - existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING)) + surviving_entry = _copy_with_new_status(entry, ManifestEntryStatus.EXISTING) + if is_v3: + surviving_entry = ManifestEntry.from_args( + status=surviving_entry.status, + snapshot_id=surviving_entry.snapshot_id, + sequence_number=surviving_entry.sequence_number, + file_sequence_number=surviving_entry.file_sequence_number, + data_file=materialized_data_file, + ) + existing_entries.append(surviving_entry) if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH: partial_rewrites_needed = True + if is_v3 and row_id_cursor is not None: + row_id_cursor += materialized_data_file.record_count if len(deleted_entries) > 0: total_deleted_entries += deleted_entries @@ -475,7 +535,15 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> with self.new_manifest_writer(spec=self.spec(manifest_file.partition_spec_id)) as writer: for existing_entry in existing_entries: writer.add_entry(existing_entry) - existing_manifests.append(writer.to_manifest_file()) + rewritten_manifest = writer.to_manifest_file() + if is_v3: + # Every surviving data file now carries its own field-142 + # row id, so the manifest-list writer must NOT re-assign a + # block. Inherit the source manifest's first_row_id (its + # range start) to keep the manifest itself row-id stable and + # prevent re-numbering of survivors. + rewritten_manifest.first_row_id = manifest_file.first_row_id + existing_manifests.append(rewritten_manifest) else: existing_manifests.append(manifest_file) else: @@ -592,6 +660,7 @@ def _existing_manifests(self) -> list[ManifestFile]: """Determine if there are any existing manifest files.""" existing_files = [] + is_v3 = self._transaction.table_metadata.format_version >= 3 manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch): for manifest_file in snapshot.manifests(io=self._io): @@ -600,14 +669,9 @@ def _existing_manifests(self) -> list[ManifestFile]: existing_files.append(manifest_file) continue - entries_to_write: set[ManifestEntry] = set() - found_deleted_entries: set[ManifestEntry] = set() - - for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): - if entry.data_file in self._deleted_data_files: - found_deleted_entries.add(entry) - else: - entries_to_write.add(entry) + is_v3_data_manifest = is_v3 and manifest_file.content == ManifestContent.DATA + live_entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True) + found_deleted_entries = [entry for entry in live_entries if entry.data_file in self._deleted_data_files] # Is the intercept the empty set? if len(found_deleted_entries) == 0: @@ -615,22 +679,49 @@ def _existing_manifests(self) -> list[ManifestFile]: continue # Delete all files from manifest - if len(entries_to_write) == 0: + if len(found_deleted_entries) == len(live_entries): continue - # We have to rewrite the manifest file without the deleted data files - with self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer: - for entry in entries_to_write: - writer.add_entry( + entries_to_write: list[ManifestEntry] = [] + row_id_cursor: int | None = manifest_file.first_row_id if is_v3_data_manifest else None + for entry in live_entries: + materialized_data_file = entry.data_file + if is_v3_data_manifest: + explicit_first_row_id = materialized_data_file.first_row_id + if explicit_first_row_id is not None: + row_id_cursor = explicit_first_row_id + if entry.data_file not in self._deleted_data_files: + if row_id_cursor is None: + raise NotImplementedError( + "Cannot perform a v3 overwrite that preserves row lineage: the source " + "manifest is missing a first_row_id and the surviving data file " + f"{materialized_data_file.file_path} has no explicit field-142 " + "first_row_id." + ) + if explicit_first_row_id is None: + materialized_data_file = copy(materialized_data_file) + materialized_data_file.first_row_id = row_id_cursor + if entry.data_file not in self._deleted_data_files: + entries_to_write.append( ManifestEntry.from_args( status=ManifestEntryStatus.EXISTING, snapshot_id=entry.snapshot_id, sequence_number=entry.sequence_number, file_sequence_number=entry.file_sequence_number, - data_file=entry.data_file, + data_file=materialized_data_file, ) ) - existing_files.append(writer.to_manifest_file()) + if is_v3_data_manifest and row_id_cursor is not None: + row_id_cursor += materialized_data_file.record_count + + # We have to rewrite the manifest file without the deleted data files + with self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer: + for entry in entries_to_write: + writer.add_entry(entry) + rewritten_manifest = writer.to_manifest_file() + if is_v3_data_manifest: + rewritten_manifest.first_row_id = manifest_file.first_row_id + existing_files.append(rewritten_manifest) return existing_files @@ -648,23 +739,51 @@ def _deleted_entries(self) -> list[ManifestEntry]: raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") executor = ExecutorFactory.get_or_create() + is_v3 = self._transaction.table_metadata.format_version >= 3 manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]: if not manifest_evaluators[manifest.partition_spec_id](manifest): return [] - return [ - ManifestEntry.from_args( - status=ManifestEntryStatus.DELETED, - snapshot_id=self._snapshot_id, - sequence_number=entry.sequence_number, - file_sequence_number=entry.file_sequence_number, - data_file=entry.data_file, + deleted_entries: list[ManifestEntry] = [] + is_v3_data_manifest = is_v3 and manifest.content == ManifestContent.DATA + row_id_cursor: int | None = manifest.first_row_id if is_v3_data_manifest else None + for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True): + materialized_data_file = entry.data_file + is_deleted_data_file = ( + entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files ) - for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True) - if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files - ] + if is_v3_data_manifest: + explicit_first_row_id = materialized_data_file.first_row_id + if explicit_first_row_id is not None: + row_id_cursor = explicit_first_row_id + if is_deleted_data_file: + if row_id_cursor is None: + raise NotImplementedError( + "Cannot perform a v3 overwrite that preserves row lineage: the source " + "manifest is missing a first_row_id and the deleted data file " + f"{materialized_data_file.file_path} has no explicit field-142 " + "first_row_id." + ) + if explicit_first_row_id is None: + materialized_data_file = copy(materialized_data_file) + materialized_data_file.first_row_id = row_id_cursor + + if is_deleted_data_file: + deleted_entries.append( + ManifestEntry.from_args( + status=ManifestEntryStatus.DELETED, + snapshot_id=self._snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=materialized_data_file, + ) + ) + if is_v3_data_manifest and row_id_cursor is not None: + row_id_cursor += materialized_data_file.record_count + + return deleted_entries list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io)) return list(itertools.chain(*list_of_entries)) @@ -751,9 +870,59 @@ def _group_by_spec(self, manifests: list[ManifestFile]) -> dict[int, list[Manife return groups def _create_manifest(self, spec_id: int, manifest_bin: list[ManifestFile]) -> ManifestFile: + format_version = self._snapshot_producer._transaction.table_metadata.format_version + # For v3 the merged manifest inherits min(first_row_id) and represents the contiguous + # range [min, min + total_rows). Reading it back assigns row ids to its DATA files in + # ENTRY order, so we must write entries in ascending source-manifest row-id order; + # otherwise inheritance would silently re-number rows. The writer emits manifests + # newest-first, so we sort here. DATA manifests carrying a first_row_id sort by it; + # any without (or delete manifests) keep a stable relative position at the front. + if format_version >= 3: + ordered_bin = sorted( + manifest_bin, + key=lambda manifest: ( + manifest.first_row_id + if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None + else -1 + ), + ) + else: + ordered_bin = manifest_bin + with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer: - for manifest in manifest_bin: + for manifest in ordered_bin: + is_v3_data_manifest = format_version >= 3 and manifest.content == ManifestContent.DATA + row_id_cursor: int | None = manifest.first_row_id if is_v3_data_manifest else None for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False): + if is_v3_data_manifest: + materialized_data_file = entry.data_file + explicit_first_row_id = materialized_data_file.first_row_id + if explicit_first_row_id is not None: + row_id_cursor = explicit_first_row_id + carries_existing_rows = not ( + entry.status == ManifestEntryStatus.ADDED and entry.snapshot_id == self._snapshot_producer.snapshot_id + ) + writes_existing_or_deleted = carries_existing_rows and ( + entry.status != ManifestEntryStatus.DELETED + or entry.snapshot_id == self._snapshot_producer.snapshot_id + ) + if writes_existing_or_deleted and explicit_first_row_id is None: + if row_id_cursor is None: + raise NotImplementedError( + "Cannot merge v3 manifests while preserving row lineage: the source " + "manifest is missing a first_row_id and data file " + f"{materialized_data_file.file_path} has no explicit field-142 " + "first_row_id." + ) + materialized_data_file = copy(materialized_data_file) + materialized_data_file.first_row_id = row_id_cursor + entry = ManifestEntry.from_args( + status=entry.status, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=materialized_data_file, + ) if entry.status == ManifestEntryStatus.DELETED and entry.snapshot_id == self._snapshot_producer.snapshot_id: # only files deleted by this snapshot should be added to the new manifest writer.delete(entry) @@ -763,22 +932,88 @@ def _create_manifest(self, spec_id: int, manifest_bin: list[ManifestFile]) -> Ma elif entry.status != ManifestEntryStatus.DELETED: # add all non-deleted files from the old manifest as existing files writer.existing(entry) - - return writer.to_manifest_file() + if is_v3_data_manifest and row_id_cursor is not None: + row_id_cursor += entry.data_file.record_count + + merged_manifest = writer.to_manifest_file() + inherited_first_row_ids = [ + manifest.first_row_id + for manifest in manifest_bin + if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None + ] + if inherited_first_row_ids: + merged_manifest.first_row_id = min(inherited_first_row_ids) + + return merged_manifest + + def _v3_row_ids_are_contiguous(self, manifest_bin: list[ManifestFile]) -> bool: + """Whether the v3 DATA manifests cover a single gapless, non-overlapping row-id range. + + A merged manifest inherits ``min(first_row_id)`` and, because the entries are written + in ascending row-id order (see ``_create_manifest``), it represents the contiguous + range ``[min, min + total_rows)``. That is only correct when the source manifests' + ranges tile that interval exactly. We therefore SORT the ranges by ``first_row_id`` + and verify they are gapless/non-overlapping. The input order does NOT matter — the + writer emits manifests newest-first (descending), so requiring ascending input here + would (and previously did) disable v3 merging entirely. + """ + assigned = [ + manifest + for manifest in manifest_bin + if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None + ] + if len(assigned) <= 1: + return True + + sorted_assigned = sorted(assigned, key=lambda manifest: manifest.first_row_id or 0) + + cursor = sorted_assigned[0].first_row_id + if cursor is None: + return False + + for manifest in sorted_assigned: + existing_rows_count = manifest.existing_rows_count + added_rows_count = manifest.added_rows_count + if existing_rows_count is None or added_rows_count is None: + return False + if manifest.first_row_id != cursor: + return False + cursor += existing_rows_count + added_rows_count + + return True def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: list[ManifestFile]) -> list[ManifestFile]: packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False) bins: list[list[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length) + format_version = self._snapshot_producer._transaction.table_metadata.format_version def merge_bin(manifest_bin: list[ManifestFile]) -> list[ManifestFile]: output_manifests = [] if len(manifest_bin) == 1: output_manifests.append(manifest_bin[0]) + elif ( + format_version >= 3 + and first_manifest in manifest_bin + and first_manifest.content == ManifestContent.DATA + and first_manifest.first_row_id is None + ): + remaining_manifests = [manifest for manifest in manifest_bin if manifest != first_manifest] + output_manifests.append(first_manifest) + if len(remaining_manifests) == 1: + output_manifests.append(remaining_manifests[0]) + elif len(remaining_manifests) < self._min_count_to_merge: + output_manifests.extend(remaining_manifests) + elif not self._v3_row_ids_are_contiguous(remaining_manifests): + output_manifests.extend(remaining_manifests) + else: + output_manifests.append(self._create_manifest(spec_id, remaining_manifests)) elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge: # if the bin has the first manifest (the new data files or an appended manifest file) then only # merge it if the number of manifests is above the minimum count. this is applied only to bins # with an in-memory manifest so that large manifests don't prevent merging older groups. output_manifests.extend(manifest_bin) + elif format_version >= 3 and not self._v3_row_ids_are_contiguous(manifest_bin): + output_manifests.extend(manifest_bin) else: output_manifests.append(self._create_manifest(spec_id, manifest_bin)) diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py index 137215ebc8..359ef339e1 100644 --- a/tests/avro/test_file.py +++ b/tests/avro/test_file.py @@ -164,6 +164,8 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: del v2_entry["file_sequence_number"] del v2_entry["data_file"]["content"] del v2_entry["data_file"]["equality_ids"] + # first_row_id (field 142) is a v3-only DataFile field, not present in the V1/V2 schema. + v2_entry["data_file"].pop("first_row_id", None) # Required in V1 v2_entry["data_file"]["block_size_in_bytes"] = DEFAULT_BLOCK_SIZE @@ -222,7 +224,10 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None: fa_entry = next(it) - assert todict(entry) == fa_entry + v2_entry = todict(entry) + # first_row_id (field 142) is a v3-only DataFile field, not present in the V2 schema. + v2_entry["data_file"].pop("first_row_id", None) + assert v2_entry == fa_entry @pytest.mark.parametrize("format_version", [1, 2]) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 7e64e6e7c0..06c85aec3e 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1718,6 +1718,7 @@ def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) -> summary=Summary(Operation.APPEND), schema_id=3, first_row_id=0, + added_rows=10, ) with pytest.raises( @@ -1727,6 +1728,47 @@ def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) -> update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) +def test_add_snapshot_update_fails_without_added_rows(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + first_row_id=1, + ) + + with pytest.raises( + ValueError, + match="Cannot add snapshot without added rows", + ): + update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + + +def test_add_snapshot_update_fails_with_null_table_next_row_id(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + first_row_id=1, + added_rows=10, + ) + + with pytest.raises( + ValueError, + match="Cannot add a snapshot when table next-row-id is null", + ): + update_table_metadata( + table_v3.metadata.model_copy(update={"next_row_id": None}), (AddSnapshotUpdate(snapshot=new_snapshot),) + ) + + def test_add_snapshot_update_updates_next_row_id(table_v3: Table) -> None: new_snapshot = Snapshot( snapshot_id=25, @@ -1741,7 +1783,7 @@ def test_add_snapshot_update_updates_next_row_id(table_v3: Table) -> None: ) new_metadata = update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) - assert new_metadata.next_row_id == 11 + assert new_metadata.next_row_id == 12 def model_roundtrips(model: BaseModel) -> bool: diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index c163c90626..6a99ef0c2b 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -184,12 +184,9 @@ def test_serialize_v2(example_table_metadata_v2: dict[str, Any]) -> None: def test_serialize_v3(example_table_metadata_v3: dict[str, Any]) -> None: - # Writing will be part of https://github.com/apache/iceberg-python/issues/1551 - - with pytest.raises(NotImplementedError) as exc_info: - _ = TableMetadataV3(**example_table_metadata_v3).model_dump_json() - - assert "Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551" in str(exc_info.value) + table_metadata = TableMetadataV3(**example_table_metadata_v3).model_dump_json() + expected = """{"location":"s3://bucket/test/location","table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true},{"id":4,"name":"u","type":"unknown","required":true},{"id":5,"name":"ns","type":"timestamp_ns","required":true},{"id":6,"name":"nstz","type":"timestamptz_ns","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"current-snapshot-id":3055729675574597004,"snapshots":[{"snapshot-id":3051729675574597004,"sequence-number":0,"timestamp-ms":1515100955770,"manifest-list":"s3://a/b/1.avro","summary":{"operation":"append"}},{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1}],"snapshot-log":[{"snapshot-id":3051729675574597004,"timestamp-ms":1515100955770},{"snapshot-id":3055729675574597004,"timestamp-ms":1555100955770}],"metadata-log":[{"metadata-file":"s3://bucket/.../v1.json","timestamp-ms":1515100}],"sort-orders":[{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]}],"default-sort-order-id":3,"refs":{"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000},"main":{"snapshot-id":3055729675574597004,"type":"branch"}},"statistics":[],"partition-statistics":[],"format-version":3,"last-sequence-number":34,"next-row-id":1}""" + assert table_metadata == expected def test_migrate_v1_schemas(example_table_metadata_v1: dict[str, Any]) -> None: @@ -837,6 +834,7 @@ def test_new_table_metadata_with_v3_schema() -> None: default_sort_order_id=1, refs={}, format_version=3, + next_row_id=0, ) assert actual.model_dump() == expected.model_dump() diff --git a/tests/table/test_v3_row_lineage.py b/tests/table/test_v3_row_lineage.py new file mode 100644 index 0000000000..7ca77aa013 --- /dev/null +++ b/tests/table/test_v3_row_lineage.py @@ -0,0 +1,640 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Acceptance tests for v3 write gate + row-lineage assignment. + +T1-foundation: exercises the real local write path end to end, asserting that +v3 row lineage (next-row-id / first-row-id / added-rows) is correct and +monotonic across multiple commits, that next-row-id never silently falls back +to None, and that the full v3 metadata round-trips through JSON. +""" + +import json +from pathlib import Path +from typing import cast + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.manifest import DataFile, ManifestContent, ManifestEntryStatus +from pyiceberg.schema import Schema +from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV3 +from pyiceberg.table.snapshots import Operation, Snapshot, Summary +from pyiceberg.table.update import AddSnapshotUpdate, update_table_metadata +from pyiceberg.types import IntegerType, NestedField, StringType + + +@pytest.fixture +def v3_catalog(tmp_path: Path) -> Catalog: + return InMemoryCatalog("t1", warehouse=f"file://{tmp_path}") + + +SCHEMA = Schema( + NestedField(field_id=1, name="id", field_type=IntegerType(), required=False), + NestedField(field_id=2, name="name", field_type=StringType(), required=False), +) + +ARROW_SCHEMA = pa.schema( + [ + pa.field("id", pa.int32(), nullable=True), + pa.field("name", pa.string(), nullable=True), + ] +) + + +def _batch(ids: list[int]) -> pa.Table: + return pa.Table.from_pylist( + [{"id": i, "name": f"row-{i}"} for i in ids], + schema=ARROW_SCHEMA, + ) + + +def test_v3_table_creation_starts_next_row_id_at_zero(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + assert tbl.metadata.format_version == 3 + assert tbl.metadata.next_row_id == 0 + + +def test_v3_append_twice_row_lineage_is_monotonic(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + + assert tbl.metadata.next_row_id == 0 + assert tbl.metadata.next_row_id is not None + + # First append: 3 rows. + tbl.append(_batch([1, 2, 3])) + tbl = v3_catalog.load_table("ns.t") + + snap1 = tbl.metadata.current_snapshot() + assert snap1 is not None + assert snap1.first_row_id == 0, "first snapshot must start assigning at row id 0" + assert snap1.added_rows == 3, "added_rows must reflect the 3 rows written, not None" + assert tbl.metadata.next_row_id == 3, "next_row_id must advance by added rows" + assert tbl.metadata.next_row_id is not None, "next_row_id must NEVER fall back to None on v3" + + # Second append: 2 rows. + tbl.append(_batch([4, 5])) + tbl = v3_catalog.load_table("ns.t") + + snaps = sorted(tbl.metadata.snapshots, key=lambda s: s.sequence_number or 0) + snap2 = snaps[-1] + assert snap2.first_row_id == 3, "second snapshot must start where the first left off" + assert snap2.added_rows == 2 + assert tbl.metadata.next_row_id == 5, "next_row_id must be strictly monotonically increasing" + + # Monotonicity across all snapshots: each first_row_id + added_rows chains. + running = 0 + for s in snaps: + assert s.first_row_id is not None + assert s.added_rows is not None + assert s.first_row_id == running, f"gap/overlap in row-id assignment at snapshot {s.snapshot_id}" + running += s.added_rows + assert running == tbl.metadata.next_row_id + + +def test_v3_merge_append_does_not_double_count_existing_rows(v3_catalog: Catalog, monkeypatch: pytest.MonkeyPatch) -> None: + """Merge-append must actually MERGE manifests and must NOT double-count existing rows. + + This is the #3070 double-count regression guard. We instrument the merge manager so the + test fails if no manifest merge ever happens (the historical bug: v3 merging was silently + disabled by a descending-vs-ascending ordering check, so this fix was dead code). + """ + from pyiceberg.table.update import snapshot as snapshot_module + + merge_calls = {"count": 0} + original_create = snapshot_module._ManifestMergeManager._create_manifest + + def _counting_create(self, spec_id, manifest_bin): # type: ignore[no-untyped-def] + merge_calls["count"] += 1 + return original_create(self, spec_id, manifest_bin) + + monkeypatch.setattr(snapshot_module._ManifestMergeManager, "_create_manifest", _counting_create) + + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table( + "ns.t", + schema=SCHEMA, + properties={ + "format-version": "3", + "commit.manifest-merge.enabled": "true", + "commit.manifest.min-count-to-merge": "1", + }, + ) + + for ids in ([1, 2, 3], [4, 5], [6, 7, 8, 9]): + tbl.append(_batch(ids)) + tbl = v3_catalog.load_table("ns.t") + + # The merge path MUST have run at least once; otherwise the double-count fix is dead code. + assert merge_calls["count"] > 0, "v3 manifest merge never ran — merging is silently disabled" + + # The data manifests must have been compacted (3 appends -> fewer than 3 DATA manifests). + snap = tbl.metadata.current_snapshot() + assert snap is not None + data_manifests = [m for m in snap.manifests(tbl.io) if m.content == ManifestContent.DATA] + assert len(data_manifests) < 3, "manifests were not actually merged" + + snaps = sorted(tbl.metadata.snapshots, key=lambda s: s.sequence_number or 0) + added_rows = [s.added_rows for s in snaps] + + # next_row_id == 9 proves no double-counting (9 real rows, not 17). + assert tbl.metadata.next_row_id == 9 + assert added_rows == [3, 2, 4] + assert all(rows is not None for rows in added_rows) + assert sum(rows for rows in added_rows if rows is not None) == tbl.metadata.next_row_id + + # The merged manifests must tile [0, 9) exactly, with each data file's row range coherent. + assigned = sorted( + ((cast(int, m.first_row_id), cast(int, m.existing_rows_count) + cast(int, m.added_rows_count)) for m in data_manifests), + key=lambda pair: pair[0], + ) + cursor = 0 + for first_row_id, rows in assigned: + assert first_row_id == cursor, "merged manifest row-id ranges have a gap/overlap" + cursor += rows + assert cursor == 9 + + # The data is still fully readable and correct after merging. + actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) + assert actual_ids == [1, 2, 3, 4, 5, 6, 7, 8, 9] + + tbl.append(_batch([])) + tbl = v3_catalog.load_table("ns.t") + + snap = tbl.metadata.current_snapshot() + assert snap is not None + merged_manifests = [m for m in snap.manifests(tbl.io) if m.content == ManifestContent.DATA and m.existing_files_count] + assert len(merged_manifests) == 1 + explicit_ranges = [ + ( + cast(int, entry.data_file.first_row_id), + entry.data_file.record_count, + ) + for manifest in merged_manifests + for entry in manifest.fetch_manifest_entry(tbl.io, discard_deleted=True) + ] + assert all(first_row_id is not None for first_row_id, _ in explicit_ranges) + explicit_ranges = sorted(explicit_ranges) + cursor = 0 + for first_row_id, rows in explicit_ranges: + assert first_row_id == cursor + cursor += rows + assert cursor == 9 + + +def test_v3_manifest_carries_first_row_id(v3_catalog: Catalog) -> None: + """The data manifest in the manifest list must be assigned a first_row_id per spec.""" + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + tbl.append(_batch([1, 2, 3])) + tbl = v3_catalog.load_table("ns.t") + + snap = tbl.metadata.current_snapshot() + assert snap is not None + manifests = snap.manifests(tbl.io) + data_manifests = [m for m in manifests if m.content == ManifestContent.DATA] + assert len(data_manifests) >= 1 + # The first (only) data manifest must carry the snapshot's first_row_id (0). + assert data_manifests[0].first_row_id == 0 + + +def test_v3_metadata_round_trips_through_json(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + tbl.append(_batch([1, 2, 3])) + tbl.append(_batch([4, 5])) + tbl = v3_catalog.load_table("ns.t") + + meta = tbl.metadata + assert isinstance(meta, TableMetadataV3) + + dumped = meta.model_dump_json() + reparsed = TableMetadataUtil.parse_raw(dumped) + assert isinstance(reparsed, TableMetadataV3) + assert reparsed.next_row_id == meta.next_row_id == 5 + # Round-trip equality of the model. + assert reparsed == meta + # The serialized JSON must include next-row-id. + assert json.loads(dumped)["next-row-id"] == 5 + + +def _data_file_row_ids(tbl: object) -> list[tuple[int | None, int | None, int]]: + """Return (manifest.first_row_id, data_file.first_row_id (field 142), record_count) for DATA entries.""" + snap = tbl.metadata.current_snapshot() # type: ignore[attr-defined] + out: list[tuple[int | None, int | None, int]] = [] + for m in snap.manifests(tbl.io): # type: ignore[attr-defined] + if m.content != ManifestContent.DATA: + continue + for entry in m.fetch_manifest_entry(tbl.io, discard_deleted=True): # type: ignore[attr-defined] + out.append((m.first_row_id, entry.data_file.first_row_id, entry.data_file.record_count)) + return out + + +def _data_files(tbl: Table) -> list[DataFile]: + snap = tbl.metadata.current_snapshot() + assert snap is not None + return [ + entry.data_file + for manifest in snap.manifests(tbl.io) + if manifest.content == ManifestContent.DATA + for entry in manifest.fetch_manifest_entry(tbl.io, discard_deleted=True) + ] + + +def _append_data_files_in_one_manifest(tbl: Table, id_groups: list[list[int]]) -> list[DataFile]: + import itertools + import uuid + + from pyiceberg.io.pyarrow import _dataframe_to_data_files + + data_files: list[DataFile] = [] + with tbl.transaction() as txn: + with txn.update_snapshot().fast_append() as fast_append: + for ids in id_groups: + for data_file in _dataframe_to_data_files( + io=tbl.io, + df=_batch(ids), + table_metadata=txn.table_metadata, + write_uuid=uuid.uuid4(), + counter=itertools.count(), + ): + data_files.append(data_file) + fast_append.append_data_file(data_file) + return data_files + + +def test_v3_overwrite_delete_in_shared_manifest_preserves_survivor_row_ids(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + + _append_data_files_in_one_manifest(tbl, [[0, 1, 2], [100, 101, 102]]) + tbl = v3_catalog.load_table("ns.t") + first_file = _data_files(tbl)[0] + assert tbl.metadata.next_row_id == 6 + assert [m[0] for m in _data_file_row_ids(tbl)] == [0, 0] + + with tbl.transaction() as txn: + with txn.update_snapshot().overwrite() as overwrite: + overwrite.delete_data_file(first_file) + tbl = v3_catalog.load_table("ns.t") + + assert tbl.metadata.next_row_id == 6 + overwrite_snap = tbl.metadata.current_snapshot() + assert overwrite_snap is not None + assert overwrite_snap.added_rows == 0 + + surviving = _data_file_row_ids(tbl) + assert len(surviving) == 1 + manifest_frid, datafile_frid, rows = surviving[0] + assert rows == 3 + assert manifest_frid == 0 + assert datafile_frid == 3 + assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [100, 101, 102] + + +def test_v3_overwrite_delete_fails_when_survivor_row_id_cannot_be_preserved(tmp_path: Path) -> None: + """When a survivor cannot have its row id preserved, the v3 overwrite must fail loudly. + + Genuine un-preservable case: two data files share ONE manifest written while the table was + still v2 (manifest first_row_id is null and the data files have no field-142). After + upgrading to v3, deleting one file via the overwrite path would rewrite the manifest with + the survivor whose absolute row id is unknown, so the writer must NOT silently renumber it. + """ + cat = InMemoryCatalog("t1undp", warehouse=f"file://{tmp_path}") + cat.create_namespace("ns") + tbl = cat.create_table("ns.t", schema=SCHEMA, properties={"format-version": "2"}) + _append_data_files_in_one_manifest(tbl, [[0, 1, 2], [100, 101, 102]]) + tbl = cat.load_table("ns.t") + + with tbl.transaction() as txn: + txn.upgrade_table_version(3) + tbl = cat.load_table("ns.t") + first_file = _data_files(tbl)[0] + + with pytest.raises(NotImplementedError, match="row lineage"): + with tbl.transaction() as txn: + with txn.update_snapshot().overwrite() as overwrite: + overwrite.delete_data_file(first_file) + + +def test_v3_whole_file_delete_does_not_renumber_surviving_rows(v3_catalog: Catalog) -> None: + """Copy-on-write whole-file delete must NEVER re-number the surviving rows. + + Two separate data files are written (row ids [0,1,2] and [3,4,5]). Deleting the whole + first file (predicate aligned to a full file) must: + - leave next_row_id unchanged (0 new rows assigned), and + - keep the surviving file's row-id lineage intact (manifest first_row_id == 3, and the + materialized data-file field-142 first_row_id == 3) — NOT renumbered to 0. + This asserts on the _row_id (field 142 / manifest first_row_id), not the user `id` column. + """ + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + + # Two separate files: ids 0,1,2 (row ids 0-2) then ids 10,11,12 (row ids 3-5). + tbl.append(_batch([0, 1, 2])) + tbl = v3_catalog.load_table("ns.t") + tbl.append(_batch([10, 11, 12])) + tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 + + before = sorted(_data_file_row_ids(tbl), key=lambda triple: triple[0] or 0) + # manifest first_row_ids should be 0 and 3. + assert [triple[0] for triple in before] == [0, 3] + + # Delete the entire first file (all ids < 5). This is a metadata-only (whole-file) delete. + tbl.delete(delete_filter="id < 5") + tbl = v3_catalog.load_table("ns.t") + + # next_row_id must NOT advance — zero rows were newly assigned. + assert tbl.metadata.next_row_id == 6, "whole-file delete must not assign new row ids" + delete_snap = tbl.metadata.current_snapshot() + assert delete_snap is not None + assert delete_snap.added_rows == 0, "a delete must report 0 added rows, not re-numbered survivors" + + # The surviving file must KEEP its original row-id lineage (manifest first_row_id == 3). + surviving = _data_file_row_ids(tbl) + assert len(surviving) == 1 + surviving_manifest_frid, surviving_datafile_frid, surviving_rows = surviving[0] + assert surviving_rows == 3 + assert surviving_manifest_frid == 3, "surviving rows must keep their original first_row_id (3), not be renumbered" + + # Data correctness. + actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) + assert actual_ids == [10, 11, 12] + + +def test_v3_whole_file_delete_with_two_survivors_renumbers_none(v3_catalog: Catalog) -> None: + """Three files; delete the middle file wholesale; the two survivors keep their row ids.""" + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + tbl.append(_batch([0, 1])) # row ids 0-1 + tbl = v3_catalog.load_table("ns.t") + tbl.append(_batch([10, 11])) # row ids 2-3 (deleted) + tbl = v3_catalog.load_table("ns.t") + tbl.append(_batch([20, 21])) # row ids 4-5 + tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 + + # delete the middle file (ids 10,11) wholesale using a bound-provable range predicate + # (min=10,max=11 fully inside [10,20)), so this is a metadata-only whole-file delete. + tbl.delete(delete_filter="id >= 10 and id < 20") + tbl = v3_catalog.load_table("ns.t") + + assert tbl.metadata.next_row_id == 6, "no new row ids on a whole-file delete" + frids = sorted(cast(int, triple[0]) for triple in _data_file_row_ids(tbl)) + # survivors must keep first_row_ids 0 and 4 (NOT renumbered to 0 and 2). + assert frids == [0, 4], "survivors were re-numbered after deleting the middle file" + actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) + assert actual_ids == [0, 1, 20, 21] + + +def test_v3_whole_file_delete_in_shared_manifest_preserves_survivor_row_ids(v3_catalog: Catalog) -> None: + """When two data files share ONE manifest and one is deleted wholesale, the survivor must + keep its row-id lineage. + + This is the strongest delete-lineage guard: the surviving file is REWRITTEN into a new + manifest (the shared source manifest is dropped), so without preserving lineage the + manifest-list writer renumbers the survivor (the historical bug: next_row_id jumped 6->9, + survivor block frid 0->6). The fix materializes the survivor's absolute _row_id into + DataFile field 142 and inherits the source manifest's first_row_id. + """ + import itertools + import uuid + + from pyiceberg.io.pyarrow import _dataframe_to_data_files + + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + + # Two data files in a SINGLE manifest (one fast-append commit): row ids 0-2 and 3-5. + with tbl.transaction() as txn: + with txn.update_snapshot().fast_append() as fast_append: + for ids in ([0, 1, 2], [100, 101, 102]): + for data_file in _dataframe_to_data_files( + io=tbl.io, + df=_batch(ids), + table_metadata=txn.table_metadata, + write_uuid=uuid.uuid4(), + counter=itertools.count(), + ): + fast_append.append_data_file(data_file) + tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 + # one shared manifest with first_row_id == 0 + assert [m[0] for m in _data_file_row_ids(tbl)] == [0, 0] + + # Whole-file delete the first file (ids 0,1,2): bound-provable (max=2 < 5). + tbl.delete(delete_filter="id < 5") + tbl = v3_catalog.load_table("ns.t") + + # next_row_id must NOT advance and no new rows are added. + assert tbl.metadata.next_row_id == 6, "survivor was re-numbered (next_row_id advanced)" + delete_snap = tbl.metadata.current_snapshot() + assert delete_snap is not None + assert delete_snap.added_rows == 0 + + surviving = _data_file_row_ids(tbl) + assert len(surviving) == 1 + manifest_frid, datafile_frid, rows = surviving[0] + assert rows == 3 + # The rewritten manifest inherits the source manifest's first_row_id (0)... + assert manifest_frid == 0, "rewritten manifest must inherit the source manifest first_row_id" + # ...and the survivor's absolute _row_id (3) is materialized into field 142. + assert datafile_frid == 3, "survivor's _row_id (field 142) must be materialized to its original value (3)" + + assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [100, 101, 102] + + +def test_v3_whole_file_delete_materializes_deleted_entry_first_row_id(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + + _append_data_files_in_one_manifest(tbl, [[0, 1, 2], [100, 101, 102]]) + tbl = v3_catalog.load_table("ns.t") + assert _data_file_row_ids(tbl) == [(0, None, 3), (0, None, 3)] + + tbl.delete(delete_filter="id >= 100") + tbl = v3_catalog.load_table("ns.t") + + snap = tbl.metadata.current_snapshot() + assert snap is not None + deleted_entries = [ + entry + for manifest in snap.manifests(tbl.io) + if manifest.content == ManifestContent.DATA + for entry in manifest.fetch_manifest_entry(tbl.io, discard_deleted=False) + if entry.status == ManifestEntryStatus.DELETED + ] + + assert len(deleted_entries) == 1 + deleted_file = deleted_entries[0].data_file + assert deleted_file.record_count == 3 + assert deleted_file.first_row_id == 3 + + +def test_v3_partial_rewrite_delete_fails_loudly(v3_catalog: Catalog) -> None: + """A copy-on-write delete that needs to REWRITE a data file must fail loudly on v3. + + Preserving _row_id lineage across a physical rewrite needs materialized per-row _row_id + columns, which PyIceberg does not have. Rather than silently re-numbering survivors, the + v3 path raises NotImplementedError. + """ + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + # Single file containing 0..5; deleting a subset forces a partial rewrite. + tbl.append(_batch([0, 1, 2, 3, 4, 5])) + tbl = v3_catalog.load_table("ns.t") + + with pytest.raises(NotImplementedError, match="copy-on-write delete"): + tbl.delete(delete_filter="id in (2, 3)") + + # The table state is unchanged (no corruption, no renumbering). + tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 + assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [0, 1, 2, 3, 4, 5] + + +def test_v3_partial_rewrite_delete_caught_in_transaction_stages_nothing(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + tbl.append(_batch([0, 1, 2, 3, 4, 5])) + tbl = v3_catalog.load_table("ns.t") + + raised = False + with tbl.transaction() as txn: + try: + txn.delete("id in (2, 3)") + except NotImplementedError as exc: + assert "copy-on-write delete" in str(exc) + raised = True + + assert raised + tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 + assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [0, 1, 2, 3, 4, 5] + + +def test_v2_partial_rewrite_delete_still_works(tmp_path: Path) -> None: + """The loud v3 failure must NOT regress v2 copy-on-write deletes.""" + cat = InMemoryCatalog("t1v2", warehouse=f"file://{tmp_path}") + cat.create_namespace("ns") + tbl = cat.create_table("ns.t", schema=SCHEMA, properties={"format-version": "2"}) + tbl.append(_batch([0, 1, 2, 3, 4, 5])) + tbl = cat.load_table("ns.t") + tbl.delete(delete_filter="id in (2, 3)") + tbl = cat.load_table("ns.t") + assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [0, 1, 4, 5] + + +def test_v3_upgrade_from_v2_seeds_next_row_id(tmp_path: Path) -> None: + """v2 -> v3 upgrade must be reachable via the public API and seed next_row_id = 0.""" + cat = InMemoryCatalog("t1up", warehouse=f"file://{tmp_path}") + cat.create_namespace("ns") + tbl = cat.create_table("ns.t", schema=SCHEMA, properties={"format-version": "2"}) + assert tbl.metadata.format_version == 2 + # v2 metadata does not carry next-row-id at all. + assert getattr(tbl.metadata, "next_row_id", None) is None + + with tbl.transaction() as txn: + txn.upgrade_table_version(3) + tbl = cat.load_table("ns.t") + + assert tbl.metadata.format_version == 3 + assert tbl.metadata.next_row_id == 0, "upgrade to v3 must seed next_row_id at 0" + + # And the upgraded table must support a v3 append with correct row lineage. + tbl.append(_batch([1, 2, 3])) + tbl = cat.load_table("ns.t") + snap = tbl.metadata.current_snapshot() + assert snap is not None + assert snap.first_row_id == 0 + assert snap.added_rows == 3 + assert tbl.metadata.next_row_id == 3 + + +def test_v3_upgrade_with_data_does_not_renumber_carried_v2_files(tmp_path: Path) -> None: + """Upgrading a NON-empty v2 table to v3 must assign each carried-forward file a row id + range exactly once and keep it stable across later commits. + + Regression: the module-level manifest cache is keyed only by manifest_path and previously + returned the pre-upgrade v2 ManifestFile (first_row_id=None). The v3 manifest-list writer + then re-assigned a fresh first_row_id to those carried files on EVERY subsequent commit, + double-counting next_row_id (9 -> 15 instead of 10) and renumbering already-assigned rows. + """ + cat = InMemoryCatalog("t1upd", warehouse=f"file://{tmp_path}") + cat.create_namespace("ns") + tbl = cat.create_table("ns.t", schema=SCHEMA, properties={"format-version": "2"}) + # 5 v2 rows across two data manifests, written BEFORE the upgrade. + tbl.append(_batch([0, 1, 2])) + tbl = cat.load_table("ns.t") + tbl.append(_batch([10, 11])) + tbl = cat.load_table("ns.t") + + with tbl.transaction() as txn: + txn.upgrade_table_version(3) + tbl = cat.load_table("ns.t") + assert tbl.metadata.next_row_id == 0 + + # First v3 append: 4 new rows. The 5 carried v2 rows get assigned ids for the first time. + tbl.append(_batch([100, 101, 102, 103])) + tbl = cat.load_table("ns.t") + snap1 = tbl.metadata.current_snapshot() + assert snap1 is not None + after_first = tbl.metadata.next_row_id + assert after_first == 9, "5 carried rows + 4 new rows must yield next_row_id == 9" + frid_by_path_1 = {m.manifest_path: m.first_row_id for m in snap1.manifests(tbl.io) if m.content == ManifestContent.DATA} + assert all(frid is not None for frid in frid_by_path_1.values()), "carried files must be assigned a first_row_id" + + # Second v3 append: only 1 NEW row. next_row_id must advance by exactly 1, NOT re-count carried rows. + tbl.append(_batch([200])) + tbl = cat.load_table("ns.t") + snap2 = tbl.metadata.current_snapshot() + assert snap2 is not None + assert tbl.metadata.next_row_id == 10, "second append must advance next_row_id by exactly 1 new row" + assert snap2.added_rows == 1 + + # Immutability: every manifest present in both snapshots must keep the SAME first_row_id. + frid_by_path_2 = {m.manifest_path: m.first_row_id for m in snap2.manifests(tbl.io) if m.content == ManifestContent.DATA} + for path in set(frid_by_path_1) & set(frid_by_path_2): + assert frid_by_path_1[path] == frid_by_path_2[path], "carried-forward file was renumbered across snapshots" + + assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [0, 1, 2, 10, 11, 100, 101, 102, 103, 200] + + +def test_v3_add_snapshot_update_advances_next_row_id_from_snapshot_first_row_id(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + assert tbl.metadata.next_row_id == 0 + + snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=None, + sequence_number=1, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=tbl.metadata.current_schema_id, + first_row_id=tbl.metadata.next_row_id, + added_rows=4, + ) + + new_metadata = update_table_metadata(tbl.metadata, (AddSnapshotUpdate(snapshot=snapshot),)) + assert new_metadata.next_row_id == 4 diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 40ad4bf221..8542556fcf 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=redefined-outer-name,arguments-renamed,fixme +from copy import copy from tempfile import TemporaryDirectory import fastavro @@ -52,6 +53,35 @@ def clear_global_manifests_cache() -> None: _manifest_cache.clear() +def test_manifest_file_copy_has_independent_data() -> None: + manifest_file = ManifestFile.from_args( + _table_format_version=3, + manifest_path="s3://bucket/table/metadata/m0.avro", + manifest_length=10, + partition_spec_id=0, + content=ManifestContent.DATA, + sequence_number=1, + min_sequence_number=1, + added_snapshot_id=1, + added_files_count=1, + existing_files_count=0, + deleted_files_count=0, + added_rows_count=3, + existing_rows_count=0, + deleted_rows_count=0, + partitions=[], + key_metadata=None, + first_row_id=None, + ) + + wrapped = copy(manifest_file) + wrapped.first_row_id = 5 + wrapped.min_sequence_number = 9 + + assert manifest_file.first_row_id is None + assert manifest_file.min_sequence_number == 1 + + def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: dict[str, str]) -> None: with open(avro_file, "rb") as f: reader = fastavro.reader(f) @@ -757,6 +787,7 @@ def test_manifest_cache_deduplicates_manifest_files() -> None: avro_compression="zstandard", ) as list_writer: list_writer.add_manifests([manifest_file1]) + manifest_file1 = _manifests(io, manifest_list1_path)[0] # Create manifest list 2: contains manifest1 and manifest2 (overlapping manifest1) manifest_list2_path = f"{tmp_dir}/manifest-list2.avro" @@ -769,6 +800,7 @@ def test_manifest_cache_deduplicates_manifest_files() -> None: avro_compression="zstandard", ) as list_writer: list_writer.add_manifests([manifest_file1, manifest_file2]) + manifest_file1, manifest_file2 = _manifests(io, manifest_list2_path) # Create manifest list 3: contains all three manifests (overlapping manifest1 and manifest2) manifest_list3_path = f"{tmp_dir}/manifest-list3.avro" @@ -868,6 +900,7 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: ) as list_writer: list_writer.add_manifests(manifest_files[: i + 1]) manifest_list_paths.append(list_path) + manifest_files[: i + 1] = _manifests(io, list_path) # Read all manifest lists all_results = []