From ab363d34c6f8d8a3adae37bd2aa558b32e6eeee4 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 09:45:44 -0500 Subject: [PATCH 1/6] v3: lift write gate and implement correct row-lineage assignment Enables writing v3 table metadata and implements spec-conformant row lineage (next-row-id / first-row-id / added-rows) end to end. - metadata.py: bump SUPPORTED_TABLE_FORMAT_VERSION to 3; remove the TableMetadataV3.model_dump_json NotImplementedError so v3 serializes; initialize next_row_id=0 for new v3 tables. - manifest.py: DEFAULT_READ_VERSION=3 so manifest-list first_row_id (field 520) round-trips; add ManifestWriterV3, ManifestListWriterV3 (assigns first_row_id only to DATA manifests lacking one, advancing by existing+added rows), and the ManifestFile.first_row_id accessor. - update/__init__.py: upgrade-to-v3 seeds next_row_id=0; AddSnapshotUpdate computes next_row_id = next_row_id + added_rows with NO silent fallback to None (raises instead), fixing the bug where v3 commits collapsed next_row_id to None. - update/snapshot.py: _commit derives added_rows from the manifest-list writer's row-id advance and sets first_row_id; merge manager inherits min(first_row_id) for merged data manifests and refuses to merge v3 manifests whose row-id ranges are non-contiguous/out-of-order (correctness over compaction). - tests: acceptance suite (create v3, append twice, merge-append, delete+merge gap, json round-trip) plus negative guards. --- pyiceberg/manifest.py | 93 ++++++++++++- pyiceberg/table/metadata.py | 6 +- pyiceberg/table/update/__init__.py | 42 +++--- pyiceberg/table/update/snapshot.py | 87 +++++++++++-- tests/table/test_init.py | 40 ++++++ tests/table/test_metadata.py | 10 +- tests/table/test_v3_row_lineage.py | 201 +++++++++++++++++++++++++++++ 7 files changed, 442 insertions(+), 37 deletions(-) create mode 100644 tests/table/test_v3_row_lineage.py diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 3811a9d894..3fd07e8656 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -54,7 +54,7 @@ UNASSIGNED_SEQ = -1 DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 -DEFAULT_READ_VERSION: Literal[2] = 2 +DEFAULT_READ_VERSION: Literal[3] = 3 INITIAL_SEQUENCE_NUMBER = 0 @@ -852,6 +852,17 @@ def partitions(self) -> list[PartitionFieldSummary] | None: def key_metadata(self) -> bytes | None: return self._data[14] + @property + def first_row_id(self) -> int | None: + return self._data[15] if len(self._data) > 15 else None + + @first_row_id.setter + def first_row_id(self, value: int) -> None: + if len(self._data) <= 15: + self._data.append(value) + else: + self._data[15] = value + def has_added_files(self) -> bool: return self.added_files_count is None or self.added_files_count > 0 @@ -1240,6 +1251,12 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: return entry +class ManifestWriterV3(ManifestWriterV2): + @property + def version(self) -> TableVersion: + return 3 + + def write_manifest( format_version: TableVersion, spec: PartitionSpec, @@ -1252,6 +1269,8 @@ def write_manifest( return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression) elif format_version == 2: return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression) + elif format_version == 3: + return ManifestWriterV3(spec, schema, output_file, snapshot_id, avro_compression) else: raise ValueError(f"Cannot write manifest for table version: {format_version}") @@ -1295,6 +1314,10 @@ def __exit__( @abstractmethod def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ... + @property + def next_row_id(self) -> int | None: + return None + def add_manifests(self, manifest_files: list[ManifestFile]) -> ManifestListWriter: self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files]) return self @@ -1351,9 +1374,7 @@ def __init__( self._commit_snapshot_id = snapshot_id self._sequence_number = sequence_number - def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: - wrapped_manifest_file = copy(manifest_file) - + def _prepare_manifest_for_commit(self, wrapped_manifest_file: ManifestFile) -> ManifestFile: if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ: # if the sequence number is being assigned here, then the manifest must be created by the current operation. # To validate this, check that the snapshot id matches the current commit @@ -1374,6 +1395,56 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: wrapped_manifest_file.min_sequence_number = self._sequence_number return wrapped_manifest_file + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + return self._prepare_manifest_for_commit(copy(manifest_file)) + + +class ManifestListWriterV3(ManifestListWriterV2): + _next_row_id: int + + def __init__( + self, + output_file: OutputFile, + snapshot_id: int, + parent_snapshot_id: int | None, + sequence_number: int, + snapshot_first_row_id: int, + compression: AvroCompressionCodec, + ): + super().__init__( + output_file=output_file, + snapshot_id=snapshot_id, + parent_snapshot_id=parent_snapshot_id, + sequence_number=sequence_number, + compression=compression, + ) + self._format_version = 3 + self._meta = { + "snapshot-id": str(snapshot_id), + "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null", + "sequence-number": str(sequence_number), + "first-row-id": str(snapshot_first_row_id), + "format-version": "3", + AVRO_CODEC_KEY: compression, + } + self._next_row_id = snapshot_first_row_id + + @property + def next_row_id(self) -> int | None: + return self._next_row_id + + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + wrapped = self._prepare_manifest_for_commit(copy(manifest_file)) + if wrapped.content == ManifestContent.DATA and wrapped.first_row_id is None: + if wrapped.existing_rows_count is None or wrapped.added_rows_count is None: + raise ValueError( + "Cannot assign first row id for a v3 manifest without existing-rows-count and added-rows-count: " + + wrapped.manifest_path + ) + wrapped.first_row_id = self._next_row_id + self._next_row_id += wrapped.existing_rows_count + wrapped.added_rows_count + return wrapped + def write_manifest_list( format_version: TableVersion, @@ -1382,6 +1453,7 @@ def write_manifest_list( parent_snapshot_id: int | None, sequence_number: int | None, avro_compression: AvroCompressionCodec, + snapshot_first_row_id: int | None = None, ) -> ManifestListWriter: if format_version == 1: return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression) @@ -1389,5 +1461,18 @@ def write_manifest_list( if sequence_number is None: raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}") return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression) + elif format_version == 3: + if sequence_number is None: + raise ValueError(f"Sequence-number is required for V3 tables: {sequence_number}") + if snapshot_first_row_id is None: + raise ValueError(f"Snapshot-first-row-id is required for V3 tables: {snapshot_first_row_id}") + return ManifestListWriterV3( + output_file=output_file, + snapshot_id=snapshot_id, + parent_snapshot_id=parent_snapshot_id, + sequence_number=sequence_number, + snapshot_first_row_id=snapshot_first_row_id, + compression=avro_compression, + ) else: raise ValueError(f"Cannot write manifest list for table version: {format_version}") diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 26b6e3d3ad..329d4c3a7a 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -66,7 +66,7 @@ INITIAL_SPEC_ID = 0 DEFAULT_SCHEMA_ID = 0 -SUPPORTED_TABLE_FORMAT_VERSION = 2 +SUPPORTED_TABLE_FORMAT_VERSION = 3 def cleanup_snapshot_id(data: dict[str, Any]) -> dict[str, Any]: @@ -574,9 +574,6 @@ def construct_refs(self) -> TableMetadata: next_row_id: int | None = Field(alias="next-row-id", default=None) """A long higher than all assigned row IDs; the next snapshot's `first-row-id`.""" - def model_dump_json(self, exclude_none: bool = True, exclude: Any | None = None, by_alias: bool = True, **kwargs: Any) -> str: - raise NotImplementedError("Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551") - TableMetadata = Annotated[TableMetadataV1 | TableMetadataV2 | TableMetadataV3, Field(discriminator="format_version")] @@ -645,6 +642,7 @@ def new_table_metadata( properties=properties, last_partition_id=fresh_partition_spec.last_assigned_field_id, table_uuid=table_uuid, + next_row_id=0, ) else: raise ValidationError(f"Unknown format version: {format_version}") diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 64838b0bd6..3da5399aed 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -29,7 +29,7 @@ from pyiceberg.exceptions import CommitFailedException from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil +from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil, TableMetadataV3 from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, @@ -322,9 +322,16 @@ def _( return base_metadata updated_metadata = base_metadata.model_copy(update={"format_version": update.format_version}) + updated_metadata = TableMetadataUtil._construct_without_validation(updated_metadata) + if ( + isinstance(updated_metadata, TableMetadataV3) + and base_metadata.format_version < 3 + and updated_metadata.next_row_id is None + ): + updated_metadata = updated_metadata.model_copy(update={"next_row_id": 0}) context.add_update(update) - return TableMetadataUtil._construct_without_validation(updated_metadata) + return updated_metadata @_apply_table_update.register(SetPropertiesUpdate) @@ -435,7 +442,7 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe elif base_metadata.snapshot_by_id(update.snapshot.snapshot_id) is not None: raise ValueError(f"Snapshot with id {update.snapshot.snapshot_id} already exists") elif ( - base_metadata.format_version == 2 + base_metadata.format_version >= 2 and update.snapshot.sequence_number is not None and update.snapshot.sequence_number <= base_metadata.last_sequence_number and update.snapshot.parent_snapshot_id is not None @@ -446,6 +453,10 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe ) elif base_metadata.format_version >= 3 and update.snapshot.first_row_id is None: raise ValueError("Cannot add snapshot without first row id") + elif base_metadata.format_version >= 3 and update.snapshot.added_rows is None: + raise ValueError("Cannot add snapshot without added rows") + elif base_metadata.format_version >= 3 and base_metadata.next_row_id is None: + raise ValueError("Cannot add a snapshot when table next-row-id is null") elif ( base_metadata.format_version >= 3 and update.snapshot.first_row_id is not None @@ -458,18 +469,19 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe ) context.add_update(update) - return base_metadata.model_copy( - update={ - "last_updated_ms": update.snapshot.timestamp_ms, - "last_sequence_number": update.snapshot.sequence_number, - "snapshots": base_metadata.snapshots + [update.snapshot], - "next_row_id": base_metadata.next_row_id + update.snapshot.added_rows - if base_metadata.format_version >= 3 - and base_metadata.next_row_id is not None - and update.snapshot.added_rows is not None - else None, - } - ) + metadata_updates: dict[str, Any] = { + "last_updated_ms": update.snapshot.timestamp_ms, + "last_sequence_number": update.snapshot.sequence_number, + "snapshots": base_metadata.snapshots + [update.snapshot], + } + if base_metadata.format_version >= 3: + if base_metadata.next_row_id is None: + raise ValueError("Cannot add a snapshot when table next-row-id is null") + if update.snapshot.added_rows is None: + raise ValueError("Cannot add snapshot without added rows") + metadata_updates["next_row_id"] = base_metadata.next_row_id + update.snapshot.added_rows + + return base_metadata.model_copy(update=metadata_updates) @_apply_table_update.register(SetSnapshotRefUpdate) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 7931edacdd..ddf5f1977c 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -264,8 +264,9 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: ) def _commit(self) -> UpdatesAndRequirements: + table_metadata = self._transaction.table_metadata new_manifests = self._manifests() - next_sequence_number = self._transaction.table_metadata.next_sequence_number() + next_sequence_number = table_metadata.next_sequence_number() summary = self._summary(self.snapshot_properties) file_name = _new_manifest_list_file_name( @@ -276,20 +277,31 @@ def _commit(self) -> UpdatesAndRequirements: location_provider = self._transaction._table.location_provider() manifest_list_file_path = location_provider.new_metadata_location(file_name) + if table_metadata.format_version >= 3: + snapshot_first_row_id = table_metadata.next_row_id + if snapshot_first_row_id is None: + raise ValueError("Cannot commit to a v3 table without next-row-id") + else: + snapshot_first_row_id = None + with write_manifest_list( - format_version=self._transaction.table_metadata.format_version, + format_version=table_metadata.format_version, output_file=self._io.new_output(manifest_list_file_path), snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, sequence_number=next_sequence_number, avro_compression=self._compression, + snapshot_first_row_id=snapshot_first_row_id, ) as writer: writer.add_manifests(new_manifests) - first_row_id: int | None = None - - if self._transaction.table_metadata.format_version >= 3: - first_row_id = self._transaction.table_metadata.next_row_id + if table_metadata.format_version >= 3: + if writer.next_row_id is None or snapshot_first_row_id is None: + raise ValueError("Cannot determine added rows for a v3 snapshot without row IDs") + added_rows = writer.next_row_id - snapshot_first_row_id + else: + added_rows = None + first_row_id = snapshot_first_row_id snapshot = Snapshot( snapshot_id=self._snapshot_id, @@ -297,8 +309,9 @@ def _commit(self) -> UpdatesAndRequirements: manifest_list=manifest_list_file_path, sequence_number=next_sequence_number, summary=summary, - schema_id=self._transaction.table_metadata.current_schema_id, + schema_id=table_metadata.current_schema_id, first_row_id=first_row_id, + added_rows=added_rows, ) add_snapshot_update = AddSnapshotUpdate(snapshot=snapshot) @@ -764,21 +777,79 @@ def _create_manifest(self, spec_id: int, manifest_bin: list[ManifestFile]) -> Ma # add all non-deleted files from the old manifest as existing files writer.existing(entry) - return writer.to_manifest_file() + merged_manifest = writer.to_manifest_file() + inherited_first_row_ids = [ + manifest.first_row_id + for manifest in manifest_bin + if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None + ] + if inherited_first_row_ids: + merged_manifest.first_row_id = min(inherited_first_row_ids) + + return merged_manifest + + def _v3_row_ids_are_contiguous(self, manifest_bin: list[ManifestFile]) -> bool: + assigned = [ + manifest + for manifest in manifest_bin + if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None + ] + if len(assigned) <= 1: + return True + + sorted_assigned = sorted(assigned, key=lambda manifest: manifest.first_row_id or 0) + # Merged manifests inherit the minimum row id and keep entry order, so the + # source manifests must already be in row-id order. + if assigned != sorted_assigned: + return False + + cursor = sorted_assigned[0].first_row_id + if cursor is None: + return False + + for manifest in sorted_assigned: + existing_rows_count = manifest.existing_rows_count + added_rows_count = manifest.added_rows_count + if existing_rows_count is None or added_rows_count is None: + return False + if manifest.first_row_id != cursor: + return False + cursor += existing_rows_count + added_rows_count + + return True def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: list[ManifestFile]) -> list[ManifestFile]: packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False) bins: list[list[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length) + format_version = self._snapshot_producer._transaction.table_metadata.format_version def merge_bin(manifest_bin: list[ManifestFile]) -> list[ManifestFile]: output_manifests = [] if len(manifest_bin) == 1: output_manifests.append(manifest_bin[0]) + elif ( + format_version >= 3 + and first_manifest in manifest_bin + and first_manifest.content == ManifestContent.DATA + and first_manifest.first_row_id is None + ): + remaining_manifests = [manifest for manifest in manifest_bin if manifest != first_manifest] + output_manifests.append(first_manifest) + if len(remaining_manifests) == 1: + output_manifests.append(remaining_manifests[0]) + elif len(remaining_manifests) < self._min_count_to_merge: + output_manifests.extend(remaining_manifests) + elif not self._v3_row_ids_are_contiguous(remaining_manifests): + output_manifests.extend(remaining_manifests) + else: + output_manifests.append(self._create_manifest(spec_id, remaining_manifests)) elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge: # if the bin has the first manifest (the new data files or an appended manifest file) then only # merge it if the number of manifests is above the minimum count. this is applied only to bins # with an in-memory manifest so that large manifests don't prevent merging older groups. output_manifests.extend(manifest_bin) + elif format_version >= 3 and not self._v3_row_ids_are_contiguous(manifest_bin): + output_manifests.extend(manifest_bin) else: output_manifests.append(self._create_manifest(spec_id, manifest_bin)) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 7e64e6e7c0..d4f5195d68 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1718,6 +1718,7 @@ def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) -> summary=Summary(Operation.APPEND), schema_id=3, first_row_id=0, + added_rows=10, ) with pytest.raises( @@ -1727,6 +1728,45 @@ def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) -> update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) +def test_add_snapshot_update_fails_without_added_rows(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + first_row_id=1, + ) + + with pytest.raises( + ValueError, + match="Cannot add snapshot without added rows", + ): + update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + + +def test_add_snapshot_update_fails_with_null_table_next_row_id(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + first_row_id=1, + added_rows=10, + ) + + with pytest.raises( + ValueError, + match="Cannot add a snapshot when table next-row-id is null", + ): + update_table_metadata(table_v3.metadata.model_copy(update={"next_row_id": None}), (AddSnapshotUpdate(snapshot=new_snapshot),)) + + def test_add_snapshot_update_updates_next_row_id(table_v3: Table) -> None: new_snapshot = Snapshot( snapshot_id=25, diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index c163c90626..6a99ef0c2b 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -184,12 +184,9 @@ def test_serialize_v2(example_table_metadata_v2: dict[str, Any]) -> None: def test_serialize_v3(example_table_metadata_v3: dict[str, Any]) -> None: - # Writing will be part of https://github.com/apache/iceberg-python/issues/1551 - - with pytest.raises(NotImplementedError) as exc_info: - _ = TableMetadataV3(**example_table_metadata_v3).model_dump_json() - - assert "Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551" in str(exc_info.value) + table_metadata = TableMetadataV3(**example_table_metadata_v3).model_dump_json() + expected = """{"location":"s3://bucket/test/location","table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true},{"id":4,"name":"u","type":"unknown","required":true},{"id":5,"name":"ns","type":"timestamp_ns","required":true},{"id":6,"name":"nstz","type":"timestamptz_ns","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"current-snapshot-id":3055729675574597004,"snapshots":[{"snapshot-id":3051729675574597004,"sequence-number":0,"timestamp-ms":1515100955770,"manifest-list":"s3://a/b/1.avro","summary":{"operation":"append"}},{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1}],"snapshot-log":[{"snapshot-id":3051729675574597004,"timestamp-ms":1515100955770},{"snapshot-id":3055729675574597004,"timestamp-ms":1555100955770}],"metadata-log":[{"metadata-file":"s3://bucket/.../v1.json","timestamp-ms":1515100}],"sort-orders":[{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]}],"default-sort-order-id":3,"refs":{"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000},"main":{"snapshot-id":3055729675574597004,"type":"branch"}},"statistics":[],"partition-statistics":[],"format-version":3,"last-sequence-number":34,"next-row-id":1}""" + assert table_metadata == expected def test_migrate_v1_schemas(example_table_metadata_v1: dict[str, Any]) -> None: @@ -837,6 +834,7 @@ def test_new_table_metadata_with_v3_schema() -> None: default_sort_order_id=1, refs={}, format_version=3, + next_row_id=0, ) assert actual.model_dump() == expected.model_dump() diff --git a/tests/table/test_v3_row_lineage.py b/tests/table/test_v3_row_lineage.py new file mode 100644 index 0000000000..e4d3f634c4 --- /dev/null +++ b/tests/table/test_v3_row_lineage.py @@ -0,0 +1,201 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Acceptance tests for v3 write gate + row-lineage assignment. + +T1-foundation: exercises the real local write path end to end, asserting that +v3 row lineage (next-row-id / first-row-id / added-rows) is correct and +monotonic across multiple commits, that next-row-id never silently falls back +to None, and that the full v3 metadata round-trips through JSON. +""" + +import json +from pathlib import Path + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.manifest import ManifestContent +from pyiceberg.schema import Schema +from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV3 +from pyiceberg.types import IntegerType, NestedField, StringType + + +@pytest.fixture +def v3_catalog(tmp_path: Path) -> Catalog: + return InMemoryCatalog("t1", warehouse=f"file://{tmp_path}") + + +SCHEMA = Schema( + NestedField(field_id=1, name="id", field_type=IntegerType(), required=False), + NestedField(field_id=2, name="name", field_type=StringType(), required=False), +) + +ARROW_SCHEMA = pa.schema([ + pa.field("id", pa.int32(), nullable=True), + pa.field("name", pa.string(), nullable=True), +]) + + +def _batch(ids: list[int]) -> pa.Table: + return pa.Table.from_pylist( + [{"id": i, "name": f"row-{i}"} for i in ids], + schema=ARROW_SCHEMA, + ) + + +def test_v3_table_creation_starts_next_row_id_at_zero(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + assert tbl.metadata.format_version == 3 + assert tbl.metadata.next_row_id == 0 + + +def test_v3_append_twice_row_lineage_is_monotonic(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + + assert tbl.metadata.next_row_id == 0 + assert tbl.metadata.next_row_id is not None + + # First append: 3 rows. + tbl.append(_batch([1, 2, 3])) + tbl = v3_catalog.load_table("ns.t") + + snap1 = tbl.metadata.current_snapshot() + assert snap1 is not None + assert snap1.first_row_id == 0, "first snapshot must start assigning at row id 0" + assert snap1.added_rows == 3, "added_rows must reflect the 3 rows written, not None" + assert tbl.metadata.next_row_id == 3, "next_row_id must advance by added rows" + assert tbl.metadata.next_row_id is not None, "next_row_id must NEVER fall back to None on v3" + + # Second append: 2 rows. + tbl.append(_batch([4, 5])) + tbl = v3_catalog.load_table("ns.t") + + snaps = sorted(tbl.metadata.snapshots, key=lambda s: s.sequence_number or 0) + snap2 = snaps[-1] + assert snap2.first_row_id == 3, "second snapshot must start where the first left off" + assert snap2.added_rows == 2 + assert tbl.metadata.next_row_id == 5, "next_row_id must be strictly monotonically increasing" + + # Monotonicity across all snapshots: each first_row_id + added_rows chains. + running = 0 + for s in snaps: + assert s.first_row_id is not None + assert s.added_rows is not None + assert s.first_row_id == running, f"gap/overlap in row-id assignment at snapshot {s.snapshot_id}" + running += s.added_rows + assert running == tbl.metadata.next_row_id + + +def test_v3_merge_append_does_not_double_count_existing_rows(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table( + "ns.t", + schema=SCHEMA, + properties={ + "format-version": "3", + "commit.manifest-merge.enabled": "true", + "commit.manifest.min-count-to-merge": "1", + }, + ) + + for ids in ([1, 2, 3], [4, 5], [6, 7, 8, 9]): + tbl.append(_batch(ids)) + tbl = v3_catalog.load_table("ns.t") + + snaps = sorted(tbl.metadata.snapshots, key=lambda s: s.sequence_number or 0) + added_rows = [s.added_rows for s in snaps] + + assert tbl.metadata.next_row_id == 9 + assert added_rows == [3, 2, 4] + assert all(rows is not None for rows in added_rows) + assert sum(rows for rows in added_rows if rows is not None) == tbl.metadata.next_row_id + + +def test_v3_manifest_carries_first_row_id(v3_catalog: Catalog) -> None: + """The data manifest in the manifest list must be assigned a first_row_id per spec.""" + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + tbl.append(_batch([1, 2, 3])) + tbl = v3_catalog.load_table("ns.t") + + snap = tbl.metadata.current_snapshot() + assert snap is not None + manifests = snap.manifests(tbl.io) + data_manifests = [m for m in manifests if m.content == ManifestContent.DATA] + assert len(data_manifests) >= 1 + # The first (only) data manifest must carry the snapshot's first_row_id (0). + assert data_manifests[0].first_row_id == 0 + + +def test_v3_metadata_round_trips_through_json(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + tbl.append(_batch([1, 2, 3])) + tbl.append(_batch([4, 5])) + tbl = v3_catalog.load_table("ns.t") + + meta = tbl.metadata + assert isinstance(meta, TableMetadataV3) + + dumped = meta.model_dump_json() + reparsed = TableMetadataUtil.parse_raw(dumped) + assert isinstance(reparsed, TableMetadataV3) + assert reparsed.next_row_id == meta.next_row_id == 5 + # Round-trip equality of the model. + assert reparsed == meta + # The serialized JSON must include next-row-id. + assert json.loads(dumped)["next-row-id"] == 5 + + +def test_v3_merge_preserves_row_ids_after_delete_creates_gaps(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table( + "ns.t", + schema=SCHEMA, + properties={ + "format-version": "3", + "commit.manifest-merge.enabled": "true", + "commit.manifest.min-count-to-merge": "2", + }, + ) + + tbl.append(_batch([1, 2, 3, 4, 5, 6])) + tbl = v3_catalog.load_table("ns.t") + tbl.append(_batch([7, 8])) + tbl = v3_catalog.load_table("ns.t") + + tbl.delete(delete_filter="id in (2, 3)") + tbl = v3_catalog.load_table("ns.t") + + tbl.append(_batch([9, 10])) + tbl = v3_catalog.load_table("ns.t") + tbl.append(_batch([11, 12])) + tbl = v3_catalog.load_table("ns.t") + + distinct_rows_ever_appended = set(range(1, 13)) + expected_surviving_ids = distinct_rows_ever_appended - {2, 3} + actual_ids = [row["id"] for row in tbl.scan().to_arrow().to_pylist()] + + assert tbl.metadata.next_row_id is not None + assert tbl.metadata.next_row_id >= len(distinct_rows_ever_appended) + assert all(snapshot.added_rows is not None for snapshot in tbl.metadata.snapshots) + assert len(actual_ids) == len(expected_surviving_ids) + assert set(actual_ids) == expected_surviving_ids From b2c9b6c2519324a411ce87d808c8f2732d99431e Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 10:51:20 -0500 Subject: [PATCH 2/6] v3-fix: preserve row lineage on delete, enable v3 manifest merge, coherent upgrade gate Remediate four defects found by the t1-foundation skeptic audit: 1. [HIGH] Copy-on-write delete no longer re-numbers surviving rows. - Whole-file deletes (incl. shared-manifest case) materialize each survivor's absolute _row_id into DataFile field 142 and inherit the source manifest's first_row_id, so next_row_id and survivors' row ids are preserved. - Partial deletes that need a physical data-file rewrite now FAIL LOUDLY on v3 (NotImplementedError) instead of silently re-numbering survivors, because PyIceberg has no read-side _row_id materialization yet. 2. [HIGH] v3 manifest merge was silently disabled by a descending-vs-ascending ordering check in _v3_row_ids_are_contiguous. The check now sorts ranges and verifies gapless/non-overlapping regardless of input order; _create_manifest writes merged entries in ascending row-id order so min(first_row_id) inheritance is correct. The #3070 double-count fix is now actually exercised. 3. [MED] upgrade_table_version gate hardcoded {1,2}; now uses SUPPORTED_TABLE_FORMAT_VERSION so v2->v3 upgrade works end to end (seeds next_row_id=0), consistent with create-allows-v3. 4. [LOW] The two mislabeled "merge" tests now instrument _create_manifest and assert a merge actually runs; added whole-file delete-lineage tests asserting field-142 _row_id values, a loud-fail test, a v2 regression test, and an upgrade test. DataFile gains a first_row_id (field 142) accessor and a __copy__ that isolates its _data list so lineage materialization does not leak into source DataFiles. --- pyiceberg/manifest.py | 34 ++++ pyiceberg/table/__init__.py | 21 ++- pyiceberg/table/update/snapshot.py | 86 +++++++++- tests/avro/test_file.py | 7 +- tests/table/test_v3_row_lineage.py | 256 ++++++++++++++++++++++++++--- 5 files changed, 370 insertions(+), 34 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 3fd07e8656..7fd90b2f10 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -531,6 +531,23 @@ def equality_ids(self) -> list[int] | None: def sort_order_id(self) -> int | None: return self._data[15] + @property + def first_row_id(self) -> int | None: + """The _row_id for the first row in the data file (field 142, v3+ only). + + Older (v1/v2) DataFile structs do not carry this field, so we return + ``None`` when the underlying record is too short to hold it. + """ + if len(self._data) > 16: + return self._data[16] + return None + + @first_row_id.setter + def first_row_id(self, value: int | None) -> None: + if len(self._data) <= 16: + raise ValueError("Cannot set first_row_id on a non-v3 DataFile (field 142 absent)") + self._data[16] = value + # Spec ID should not be stored in the file _spec_id: int @@ -549,6 +566,23 @@ def __setattr__(self, name: str, value: Any) -> None: value = FileFormat[value] super().__setattr__(name, value) + def __copy__(self) -> DataFile: + """Return a copy whose ``_data`` list is independent from the original. + + ``Record`` stores its fields in a mutable ``_data`` list, so the default + ``copy.copy`` would share that list and a mutation on the copy (e.g. + assigning ``first_row_id`` while materializing row lineage on a delete) + would leak back into the source DataFile. Copying the list isolates + field-level mutations to the copy while preserving ``_spec_id``. + """ + cls = self.__class__ + new = cls.__new__(cls) + new._data = list(self._data) + instance_dict = getattr(self, "__dict__", None) + if instance_dict: + new.__dict__.update(instance_dict) + return new + def __hash__(self) -> int: """Return the hash of the file path.""" return hash(self.file_path) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 597f62632f..b086111fdc 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -48,7 +48,7 @@ from pyiceberg.table.inspect import InspectTable from pyiceberg.table.locations import LocationProvider, load_location_provider from pyiceberg.table.maintenance import MaintenanceTable -from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata +from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata from pyiceberg.table.name_mapping import NameMapping from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry @@ -293,7 +293,7 @@ def upgrade_table_version(self, format_version: TableVersion) -> Transaction: Returns: The alter table builder. """ - if format_version not in {1, 2}: + if not 1 <= format_version <= SUPPORTED_TABLE_FORMAT_VERSION: raise ValueError(f"Unsupported table format version: {format_version}") if format_version < self.table_metadata.format_version: @@ -730,6 +730,23 @@ def delete( # Check if there are any files that require an actual rewrite of a data file if delete_snapshot.rewrites_needed is True: + if self.table_metadata.format_version >= 3: + # A partial (copy-on-write) delete physically rewrites surviving rows into a + # NEW data file. Per the v3 spec those rows must KEEP their original _row_id + # (field 142 / the _row_id metadata column). After a partial delete the + # survivors are generally non-contiguous (e.g. 0,1,4,5), so their lineage can + # only be preserved by materializing an explicit per-row _row_id column on read + # and persisting it on rewrite. PyIceberg has no read-side _row_id + # materialization yet, so doing this rewrite would silently RE-NUMBER the + # surviving rows and corrupt row lineage. We fail loudly instead. + raise NotImplementedError( + "v3 copy-on-write delete that requires rewriting a data file is not " + "supported yet: surviving rows would lose their _row_id lineage because " + "PyIceberg cannot materialize/preserve per-row _row_id (field 142) across a " + "physical rewrite. Whole-file deletes (which drop entire data files and " + "preserve survivors' row ids) are supported. Track: read-side _row_id " + "materialization." + ) bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive) preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter, self.table_metadata.schema()) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index ddf5f1977c..00d40d1a8e 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -21,6 +21,7 @@ from abc import abstractmethod from collections import defaultdict from collections.abc import Callable +from copy import copy from datetime import datetime from functools import cached_property from typing import TYPE_CHECKING, Generic @@ -441,6 +442,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # avoid copying metadata for each evaluator table_metadata = self._transaction.table_metadata + is_v3 = table_metadata.format_version >= 3 schema = table_metadata.schema() manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) @@ -469,16 +471,53 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # It is relevant, let's check out the content deleted_entries = [] existing_entries = [] + # For v3 tables, surviving data files must KEEP the row ids they were + # already assigned. Rewriting the manifest would otherwise let the + # manifest-list writer re-number them (the source manifest's + # first_row_id is dropped). We materialize each surviving file's + # absolute _row_id into DataFile field 142 so it survives the rewrite. + # A file inherits row id = manifest.first_row_id + sum(record_count of + # all preceding data files in the manifest), unless it already carries + # an explicit field-142 value. + row_id_cursor: int | None = manifest_file.first_row_id if is_v3 else None for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): + materialized_data_file = entry.data_file + if is_v3: + explicit_first_row_id = materialized_data_file.first_row_id + if explicit_first_row_id is not None: + row_id_cursor = explicit_first_row_id + if row_id_cursor is None: + raise NotImplementedError( + "Cannot perform a v3 copy-on-write delete that preserves row " + "lineage: the source manifest is missing a first_row_id and " + f"the data file {materialized_data_file.file_path} has no " + "explicit field-142 first_row_id, so surviving rows cannot be " + "renumber-safely rewritten." + ) + if explicit_first_row_id is None: + # Persist the inherited absolute row id so it survives the rewrite. + materialized_data_file = copy(materialized_data_file) + materialized_data_file.first_row_id = row_id_cursor if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH: # Based on the metadata, it can be dropped right away deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED)) self._deleted_data_files.add(entry.data_file) else: # Based on the metadata, we cannot determine if it can be deleted - existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING)) + surviving_entry = _copy_with_new_status(entry, ManifestEntryStatus.EXISTING) + if is_v3: + surviving_entry = ManifestEntry.from_args( + status=surviving_entry.status, + snapshot_id=surviving_entry.snapshot_id, + sequence_number=surviving_entry.sequence_number, + file_sequence_number=surviving_entry.file_sequence_number, + data_file=materialized_data_file, + ) + existing_entries.append(surviving_entry) if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH: partial_rewrites_needed = True + if is_v3 and row_id_cursor is not None: + row_id_cursor += materialized_data_file.record_count if len(deleted_entries) > 0: total_deleted_entries += deleted_entries @@ -488,7 +527,15 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> with self.new_manifest_writer(spec=self.spec(manifest_file.partition_spec_id)) as writer: for existing_entry in existing_entries: writer.add_entry(existing_entry) - existing_manifests.append(writer.to_manifest_file()) + rewritten_manifest = writer.to_manifest_file() + if is_v3: + # Every surviving data file now carries its own field-142 + # row id, so the manifest-list writer must NOT re-assign a + # block. Inherit the source manifest's first_row_id (its + # range start) to keep the manifest itself row-id stable and + # prevent re-numbering of survivors. + rewritten_manifest.first_row_id = manifest_file.first_row_id + existing_manifests.append(rewritten_manifest) else: existing_manifests.append(manifest_file) else: @@ -764,8 +811,27 @@ def _group_by_spec(self, manifests: list[ManifestFile]) -> dict[int, list[Manife return groups def _create_manifest(self, spec_id: int, manifest_bin: list[ManifestFile]) -> ManifestFile: + format_version = self._snapshot_producer._transaction.table_metadata.format_version + # For v3 the merged manifest inherits min(first_row_id) and represents the contiguous + # range [min, min + total_rows). Reading it back assigns row ids to its DATA files in + # ENTRY order, so we must write entries in ascending source-manifest row-id order; + # otherwise inheritance would silently re-number rows. The writer emits manifests + # newest-first, so we sort here. DATA manifests carrying a first_row_id sort by it; + # any without (or delete manifests) keep a stable relative position at the front. + if format_version >= 3: + ordered_bin = sorted( + manifest_bin, + key=lambda manifest: ( + manifest.first_row_id + if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None + else -1 + ), + ) + else: + ordered_bin = manifest_bin + with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer: - for manifest in manifest_bin: + for manifest in ordered_bin: for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False): if entry.status == ManifestEntryStatus.DELETED and entry.snapshot_id == self._snapshot_producer.snapshot_id: # only files deleted by this snapshot should be added to the new manifest @@ -789,6 +855,16 @@ def _create_manifest(self, spec_id: int, manifest_bin: list[ManifestFile]) -> Ma return merged_manifest def _v3_row_ids_are_contiguous(self, manifest_bin: list[ManifestFile]) -> bool: + """Whether the v3 DATA manifests cover a single gapless, non-overlapping row-id range. + + A merged manifest inherits ``min(first_row_id)`` and, because the entries are written + in ascending row-id order (see ``_create_manifest``), it represents the contiguous + range ``[min, min + total_rows)``. That is only correct when the source manifests' + ranges tile that interval exactly. We therefore SORT the ranges by ``first_row_id`` + and verify they are gapless/non-overlapping. The input order does NOT matter — the + writer emits manifests newest-first (descending), so requiring ascending input here + would (and previously did) disable v3 merging entirely. + """ assigned = [ manifest for manifest in manifest_bin @@ -798,10 +874,6 @@ def _v3_row_ids_are_contiguous(self, manifest_bin: list[ManifestFile]) -> bool: return True sorted_assigned = sorted(assigned, key=lambda manifest: manifest.first_row_id or 0) - # Merged manifests inherit the minimum row id and keep entry order, so the - # source manifests must already be in row-id order. - if assigned != sorted_assigned: - return False cursor = sorted_assigned[0].first_row_id if cursor is None: diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py index 137215ebc8..359ef339e1 100644 --- a/tests/avro/test_file.py +++ b/tests/avro/test_file.py @@ -164,6 +164,8 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: del v2_entry["file_sequence_number"] del v2_entry["data_file"]["content"] del v2_entry["data_file"]["equality_ids"] + # first_row_id (field 142) is a v3-only DataFile field, not present in the V1/V2 schema. + v2_entry["data_file"].pop("first_row_id", None) # Required in V1 v2_entry["data_file"]["block_size_in_bytes"] = DEFAULT_BLOCK_SIZE @@ -222,7 +224,10 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None: fa_entry = next(it) - assert todict(entry) == fa_entry + v2_entry = todict(entry) + # first_row_id (field 142) is a v3-only DataFile field, not present in the V2 schema. + v2_entry["data_file"].pop("first_row_id", None) + assert v2_entry == fa_entry @pytest.mark.parametrize("format_version", [1, 2]) diff --git a/tests/table/test_v3_row_lineage.py b/tests/table/test_v3_row_lineage.py index e4d3f634c4..6286bb9dbe 100644 --- a/tests/table/test_v3_row_lineage.py +++ b/tests/table/test_v3_row_lineage.py @@ -104,7 +104,26 @@ def test_v3_append_twice_row_lineage_is_monotonic(v3_catalog: Catalog) -> None: assert running == tbl.metadata.next_row_id -def test_v3_merge_append_does_not_double_count_existing_rows(v3_catalog: Catalog) -> None: +def test_v3_merge_append_does_not_double_count_existing_rows( + v3_catalog: Catalog, monkeypatch: pytest.MonkeyPatch +) -> None: + """Merge-append must actually MERGE manifests and must NOT double-count existing rows. + + This is the #3070 double-count regression guard. We instrument the merge manager so the + test fails if no manifest merge ever happens (the historical bug: v3 merging was silently + disabled by a descending-vs-ascending ordering check, so this fix was dead code). + """ + from pyiceberg.table.update import snapshot as snapshot_module + + merge_calls = {"count": 0} + original_create = snapshot_module._ManifestMergeManager._create_manifest + + def _counting_create(self, spec_id, manifest_bin): # type: ignore[no-untyped-def] + merge_calls["count"] += 1 + return original_create(self, spec_id, manifest_bin) + + monkeypatch.setattr(snapshot_module._ManifestMergeManager, "_create_manifest", _counting_create) + v3_catalog.create_namespace("ns") tbl = v3_catalog.create_table( "ns.t", @@ -120,14 +139,39 @@ def test_v3_merge_append_does_not_double_count_existing_rows(v3_catalog: Catalog tbl.append(_batch(ids)) tbl = v3_catalog.load_table("ns.t") + # The merge path MUST have run at least once; otherwise the double-count fix is dead code. + assert merge_calls["count"] > 0, "v3 manifest merge never ran — merging is silently disabled" + + # The data manifests must have been compacted (3 appends -> fewer than 3 DATA manifests). + snap = tbl.metadata.current_snapshot() + assert snap is not None + data_manifests = [m for m in snap.manifests(tbl.io) if m.content == ManifestContent.DATA] + assert len(data_manifests) < 3, "manifests were not actually merged" + snaps = sorted(tbl.metadata.snapshots, key=lambda s: s.sequence_number or 0) added_rows = [s.added_rows for s in snaps] + # next_row_id == 9 proves no double-counting (9 real rows, not 17). assert tbl.metadata.next_row_id == 9 assert added_rows == [3, 2, 4] assert all(rows is not None for rows in added_rows) assert sum(rows for rows in added_rows if rows is not None) == tbl.metadata.next_row_id + # The merged manifests must tile [0, 9) exactly, with each data file's row range coherent. + assigned = sorted( + ((m.first_row_id, m.existing_rows_count + m.added_rows_count) for m in data_manifests), + key=lambda pair: pair[0], + ) + cursor = 0 + for first_row_id, rows in assigned: + assert first_row_id == cursor, "merged manifest row-id ranges have a gap/overlap" + cursor += rows + assert cursor == 9 + + # The data is still fully readable and correct after merging. + actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) + assert actual_ids == [1, 2, 3, 4, 5, 6, 7, 8, 9] + def test_v3_manifest_carries_first_row_id(v3_catalog: Catalog) -> None: """The data manifest in the manifest list must be assigned a first_row_id per spec.""" @@ -165,37 +209,201 @@ def test_v3_metadata_round_trips_through_json(v3_catalog: Catalog) -> None: assert json.loads(dumped)["next-row-id"] == 5 -def test_v3_merge_preserves_row_ids_after_delete_creates_gaps(v3_catalog: Catalog) -> None: +def _data_file_row_ids(tbl: object) -> list[tuple[int | None, int | None, int]]: + """Return (manifest.first_row_id, data_file.first_row_id (field 142), record_count) for DATA entries.""" + snap = tbl.metadata.current_snapshot() # type: ignore[attr-defined] + out: list[tuple[int | None, int | None, int]] = [] + for m in snap.manifests(tbl.io): # type: ignore[attr-defined] + if m.content != ManifestContent.DATA: + continue + for entry in m.fetch_manifest_entry(tbl.io, discard_deleted=True): # type: ignore[attr-defined] + out.append((m.first_row_id, entry.data_file.first_row_id, entry.data_file.record_count)) + return out + + +def test_v3_whole_file_delete_does_not_renumber_surviving_rows(v3_catalog: Catalog) -> None: + """Copy-on-write whole-file delete must NEVER re-number the surviving rows. + + Two separate data files are written (row ids [0,1,2] and [3,4,5]). Deleting the whole + first file (predicate aligned to a full file) must: + - leave next_row_id unchanged (0 new rows assigned), and + - keep the surviving file's row-id lineage intact (manifest first_row_id == 3, and the + materialized data-file field-142 first_row_id == 3) — NOT renumbered to 0. + This asserts on the _row_id (field 142 / manifest first_row_id), not the user `id` column. + """ v3_catalog.create_namespace("ns") - tbl = v3_catalog.create_table( - "ns.t", - schema=SCHEMA, - properties={ - "format-version": "3", - "commit.manifest-merge.enabled": "true", - "commit.manifest.min-count-to-merge": "2", - }, - ) + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) - tbl.append(_batch([1, 2, 3, 4, 5, 6])) + # Two separate files: ids 0,1,2 (row ids 0-2) then ids 10,11,12 (row ids 3-5). + tbl.append(_batch([0, 1, 2])) tbl = v3_catalog.load_table("ns.t") - tbl.append(_batch([7, 8])) + tbl.append(_batch([10, 11, 12])) tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 - tbl.delete(delete_filter="id in (2, 3)") + before = sorted(_data_file_row_ids(tbl), key=lambda triple: triple[0] or 0) + # manifest first_row_ids should be 0 and 3. + assert [triple[0] for triple in before] == [0, 3] + + # Delete the entire first file (all ids < 5). This is a metadata-only (whole-file) delete. + tbl.delete(delete_filter="id < 5") tbl = v3_catalog.load_table("ns.t") - tbl.append(_batch([9, 10])) + # next_row_id must NOT advance — zero rows were newly assigned. + assert tbl.metadata.next_row_id == 6, "whole-file delete must not assign new row ids" + delete_snap = tbl.metadata.current_snapshot() + assert delete_snap is not None + assert delete_snap.added_rows == 0, "a delete must report 0 added rows, not re-numbered survivors" + + # The surviving file must KEEP its original row-id lineage (manifest first_row_id == 3). + surviving = _data_file_row_ids(tbl) + assert len(surviving) == 1 + surviving_manifest_frid, surviving_datafile_frid, surviving_rows = surviving[0] + assert surviving_rows == 3 + assert surviving_manifest_frid == 3, "surviving rows must keep their original first_row_id (3), not be renumbered" + + # Data correctness. + actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) + assert actual_ids == [10, 11, 12] + + +def test_v3_whole_file_delete_with_two_survivors_renumbers_none(v3_catalog: Catalog) -> None: + """Three files; delete the middle file wholesale; the two survivors keep their row ids.""" + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + tbl.append(_batch([0, 1])) # row ids 0-1 + tbl = v3_catalog.load_table("ns.t") + tbl.append(_batch([10, 11])) # row ids 2-3 (deleted) tbl = v3_catalog.load_table("ns.t") - tbl.append(_batch([11, 12])) + tbl.append(_batch([20, 21])) # row ids 4-5 tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 - distinct_rows_ever_appended = set(range(1, 13)) - expected_surviving_ids = distinct_rows_ever_appended - {2, 3} - actual_ids = [row["id"] for row in tbl.scan().to_arrow().to_pylist()] + # delete the middle file (ids 10,11) wholesale using a bound-provable range predicate + # (min=10,max=11 fully inside [10,20)), so this is a metadata-only whole-file delete. + tbl.delete(delete_filter="id >= 10 and id < 20") + tbl = v3_catalog.load_table("ns.t") - assert tbl.metadata.next_row_id is not None - assert tbl.metadata.next_row_id >= len(distinct_rows_ever_appended) - assert all(snapshot.added_rows is not None for snapshot in tbl.metadata.snapshots) - assert len(actual_ids) == len(expected_surviving_ids) - assert set(actual_ids) == expected_surviving_ids + assert tbl.metadata.next_row_id == 6, "no new row ids on a whole-file delete" + frids = sorted(triple[0] for triple in _data_file_row_ids(tbl)) + # survivors must keep first_row_ids 0 and 4 (NOT renumbered to 0 and 2). + assert frids == [0, 4], "survivors were re-numbered after deleting the middle file" + actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) + assert actual_ids == [0, 1, 20, 21] + + +def test_v3_whole_file_delete_in_shared_manifest_preserves_survivor_row_ids(v3_catalog: Catalog) -> None: + """When two data files share ONE manifest and one is deleted wholesale, the survivor must + keep its row-id lineage. + + This is the strongest delete-lineage guard: the surviving file is REWRITTEN into a new + manifest (the shared source manifest is dropped), so without preserving lineage the + manifest-list writer renumbers the survivor (the historical bug: next_row_id jumped 6->9, + survivor block frid 0->6). The fix materializes the survivor's absolute _row_id into + DataFile field 142 and inherits the source manifest's first_row_id. + """ + import itertools + import uuid + + from pyiceberg.io.pyarrow import _dataframe_to_data_files + + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + + # Two data files in a SINGLE manifest (one fast-append commit): row ids 0-2 and 3-5. + with tbl.transaction() as txn: + with txn.update_snapshot().fast_append() as fast_append: + for ids in ([0, 1, 2], [100, 101, 102]): + for data_file in _dataframe_to_data_files( + io=tbl.io, + df=_batch(ids), + table_metadata=txn.table_metadata, + write_uuid=uuid.uuid4(), + counter=itertools.count(), + ): + fast_append.append_data_file(data_file) + tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 + # one shared manifest with first_row_id == 0 + assert [m[0] for m in _data_file_row_ids(tbl)] == [0, 0] + + # Whole-file delete the first file (ids 0,1,2): bound-provable (max=2 < 5). + tbl.delete(delete_filter="id < 5") + tbl = v3_catalog.load_table("ns.t") + + # next_row_id must NOT advance and no new rows are added. + assert tbl.metadata.next_row_id == 6, "survivor was re-numbered (next_row_id advanced)" + delete_snap = tbl.metadata.current_snapshot() + assert delete_snap is not None + assert delete_snap.added_rows == 0 + + surviving = _data_file_row_ids(tbl) + assert len(surviving) == 1 + manifest_frid, datafile_frid, rows = surviving[0] + assert rows == 3 + # The rewritten manifest inherits the source manifest's first_row_id (0)... + assert manifest_frid == 0, "rewritten manifest must inherit the source manifest first_row_id" + # ...and the survivor's absolute _row_id (3) is materialized into field 142. + assert datafile_frid == 3, "survivor's _row_id (field 142) must be materialized to its original value (3)" + + assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [100, 101, 102] + + +def test_v3_partial_rewrite_delete_fails_loudly(v3_catalog: Catalog) -> None: + """A copy-on-write delete that needs to REWRITE a data file must fail loudly on v3. + + Preserving _row_id lineage across a physical rewrite needs materialized per-row _row_id + columns, which PyIceberg does not have. Rather than silently re-numbering survivors, the + v3 path raises NotImplementedError. + """ + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + # Single file containing 0..5; deleting a subset forces a partial rewrite. + tbl.append(_batch([0, 1, 2, 3, 4, 5])) + tbl = v3_catalog.load_table("ns.t") + + with pytest.raises(NotImplementedError, match="copy-on-write delete"): + tbl.delete(delete_filter="id in (2, 3)") + + # The table state is unchanged (no corruption, no renumbering). + tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 + assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [0, 1, 2, 3, 4, 5] + + +def test_v2_partial_rewrite_delete_still_works(tmp_path: Path) -> None: + """The loud v3 failure must NOT regress v2 copy-on-write deletes.""" + cat = InMemoryCatalog("t1v2", warehouse=f"file://{tmp_path}") + cat.create_namespace("ns") + tbl = cat.create_table("ns.t", schema=SCHEMA, properties={"format-version": "2"}) + tbl.append(_batch([0, 1, 2, 3, 4, 5])) + tbl = cat.load_table("ns.t") + tbl.delete(delete_filter="id in (2, 3)") + tbl = cat.load_table("ns.t") + assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [0, 1, 4, 5] + + +def test_v3_upgrade_from_v2_seeds_next_row_id(tmp_path: Path) -> None: + """v2 -> v3 upgrade must be reachable via the public API and seed next_row_id = 0.""" + cat = InMemoryCatalog("t1up", warehouse=f"file://{tmp_path}") + cat.create_namespace("ns") + tbl = cat.create_table("ns.t", schema=SCHEMA, properties={"format-version": "2"}) + assert tbl.metadata.format_version == 2 + # v2 metadata does not carry next-row-id at all. + assert getattr(tbl.metadata, "next_row_id", None) is None + + with tbl.transaction() as txn: + txn.upgrade_table_version(3) + tbl = cat.load_table("ns.t") + + assert tbl.metadata.format_version == 3 + assert tbl.metadata.next_row_id == 0, "upgrade to v3 must seed next_row_id at 0" + + # And the upgraded table must support a v3 append with correct row lineage. + tbl.append(_batch([1, 2, 3])) + tbl = cat.load_table("ns.t") + snap = tbl.metadata.current_snapshot() + assert snap is not None + assert snap.first_row_id == 0 + assert snap.added_rows == 3 + assert tbl.metadata.next_row_id == 3 From 585aa19271974f4b45653389871382d1232b3e3b Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 11:01:22 -0500 Subject: [PATCH 3/6] v3-fix: honest docstring on DataFile.first_row_id length guard Address skeptic re-audit LOW nit: from_args always allocates the full v3-width record, so the length-based "non-v3" guard never fires for from_args-built files. Clarify the comment; the guard is retained only for directly-constructed short records. --- pyiceberg/manifest.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 7fd90b2f10..04b24a4845 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -535,8 +535,10 @@ def sort_order_id(self) -> int | None: def first_row_id(self) -> int | None: """The _row_id for the first row in the data file (field 142, v3+ only). - Older (v1/v2) DataFile structs do not carry this field, so we return - ``None`` when the underlying record is too short to hold it. + Note: ``DataFile.from_args`` always allocates the full v3-width record + (``len == 20``) regardless of table version, so a v1/v2 file simply holds + ``None`` at position 16. The length guard below only matters for records + constructed directly from a shorter positional list. """ if len(self._data) > 16: return self._data[16] @@ -545,7 +547,7 @@ def first_row_id(self) -> int | None: @first_row_id.setter def first_row_id(self, value: int | None) -> None: if len(self._data) <= 16: - raise ValueError("Cannot set first_row_id on a non-v3 DataFile (field 142 absent)") + raise ValueError("Cannot set first_row_id: record has no field-142 slot") self._data[16] = value # Spec ID should not be stored in the file From b9b351ccf904af62b7d36bbba978fbaa97e942f3 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 12:42:33 -0500 Subject: [PATCH 4/6] v3: read-side _row_id / _last_updated_sequence_number materialization Surface per-row v3 row-lineage metadata columns at read time, completing the read half of row lineage (the write path already persists first_row_id / next_row_id). - schema.py: reserved metadata column constants (_row_id = 2147483540, _last_updated_sequence_number = 2147483539, both optional LongType) + name lookup. - table/__init__.py: - TableScan.projection now lets callers opt into the reserved columns by name (case-sensitive or not); select("*") does NOT auto-include them. - FileScanTask carries the manifest entry's data_sequence_number; planner wires it. - _open_manifest materializes each data file's inherited first_row_id from the manifest base per the v3 "First Row ID Inheritance" rule, counting deleted null-first_row_id entries toward the running base (fetch discard_deleted=False, drop deleted entries only after the base is computed). - io/pyarrow.py: _task_to_record_batches computes _row_id = first_row_id + in-file position and _last_updated_sequence_number = data sequence number, with per-row coalesce over any physically-materialized column. Row filter is applied after scan (not pushed down) when _row_id is requested so positions stay physical; positions ride through positional-delete take() and the post-scan filter to stay aligned. Tests: tests/table/test_v3_row_id_read.py (exact-value _row_id across two appends, projection alongside real columns, positional-delete+filter alignment, v2 raises, select("*") excludes metadata, inheritance counts deleted entries). STRETCH (partial-delete _row_id preservation) deferred: lifting the foundation's copy-on-write loud-fail requires physically writing a _row_id column into rewritten data files, which the schema-bound write path does not support yet. The read-side coalesce that would consume such a column is already implemented. --- pyiceberg/io/pyarrow.py | 158 +++++++++++++++++++-- pyiceberg/schema.py | 19 +++ pyiceberg/table/__init__.py | 71 ++++++++-- tests/table/test_v3_row_id_read.py | 218 +++++++++++++++++++++++++++++ 4 files changed, 445 insertions(+), 21 deletions(-) create mode 100644 tests/table/test_v3_row_id_read.py diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 076098c757..5ccddafc28 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -129,6 +129,9 @@ ) from pyiceberg.partitioning import PartitionFieldValue, PartitionKey, PartitionSpec from pyiceberg.schema import ( + LAST_UPDATED_SEQUENCE_NUMBER_FIELD_ID, + RESERVED_METADATA_FIELD_IDS, + ROW_ID_FIELD_ID, PartnerAccessor, PreOrderSchemaVisitor, Schema, @@ -1612,6 +1615,124 @@ def _get_column_projection_values( return projected_missing_fields +def _projected_data_schema(projected_schema: Schema) -> Schema: + return Schema( + *(field for field in projected_schema.fields if field.field_id not in RESERVED_METADATA_FIELD_IDS), + schema_id=projected_schema.schema_id, + identifier_field_ids=projected_schema.identifier_field_ids, + ) + + +def _projected_metadata_fields(projected_schema: Schema) -> tuple[NestedField, ...]: + return tuple(field for field in projected_schema.fields if field.field_id in RESERVED_METADATA_FIELD_IDS) + + +def _column_by_field_id(file_schema: Schema, batch: pa.RecordBatch, field_id: int) -> pa.Array | None: + for idx, field in enumerate(file_schema.fields): + if field.field_id == field_id: + return batch.column(idx) + return None + + +def _as_int64_array(array: pa.Array | pa.ChunkedArray) -> pa.Array: + if isinstance(array, pa.ChunkedArray): + array = array.combine_chunks() + if array.type != pa.int64(): + return array.cast(pa.int64()) + return array + + +def _int64_range(start: int, stop: int) -> pa.Array: + return pa.array(range(start, stop), type=pa.int64()) + + +def _filter_batch_and_positions( + batch: pa.RecordBatch, pyarrow_filter: ds.Expression, positions: pa.Array | None +) -> tuple[pa.RecordBatch, pa.Array | None]: + position_column_name = "__iceberg_row_position" + while position_column_name in batch.schema.names: + position_column_name = f"{position_column_name}_" + + table = pa.Table.from_batches([batch]) + if positions is not None: + table = table.append_column(position_column_name, positions) + + table = table.filter(pyarrow_filter) + if table.num_rows == 0: + return batch.slice(0, 0), pa.array([], type=pa.int64()) if positions is not None else None + + if positions is not None: + positions = _as_int64_array(table.column(position_column_name)) + table = table.drop([position_column_name]) + + return table.combine_chunks().to_batches()[0], positions + + +def _row_id_array(task: FileScanTask, file_schema: Schema, batch: pa.RecordBatch, positions: pa.Array | None) -> pa.Array: + if task.file.first_row_id is None: + raise ValueError("Cannot read _row_id: row lineage requires a v3 data file with first_row_id") + if positions is None: + raise ValueError("Cannot read _row_id: row positions were not materialized") + + computed_row_ids = pc.add(positions, pa.scalar(task.file.first_row_id, type=pa.int64())) + physical_row_ids = _column_by_field_id(file_schema, batch, ROW_ID_FIELD_ID) + if physical_row_ids is None: + return computed_row_ids + + return pc.coalesce(_as_int64_array(physical_row_ids), computed_row_ids) + + +def _last_updated_sequence_number_array(task: FileScanTask, file_schema: Schema, batch: pa.RecordBatch) -> pa.Array: + physical_sequence_numbers = _column_by_field_id(file_schema, batch, LAST_UPDATED_SEQUENCE_NUMBER_FIELD_ID) + + if task.data_sequence_number is None: + if physical_sequence_numbers is None: + raise ValueError( + "Cannot read _last_updated_sequence_number: data sequence number is missing from the file scan task" + ) + physical_sequence_numbers = _as_int64_array(physical_sequence_numbers) + if physical_sequence_numbers.null_count > 0: + raise ValueError( + "Cannot read _last_updated_sequence_number: data sequence number is required for null physical values" + ) + return physical_sequence_numbers + + fallback_sequence_numbers = pa.repeat(pa.scalar(task.data_sequence_number, type=pa.int64()), batch.num_rows) + if physical_sequence_numbers is None: + return fallback_sequence_numbers + + return pc.coalesce(_as_int64_array(physical_sequence_numbers), fallback_sequence_numbers) + + +def _append_row_lineage_metadata_columns( + projected_schema: Schema, + file_schema: Schema, + file_batch: pa.RecordBatch, + projected_batch: pa.RecordBatch, + task: FileScanTask, + positions: pa.Array | None, +) -> pa.RecordBatch: + metadata_fields = _projected_metadata_fields(projected_schema) + if not metadata_fields: + return projected_batch + + arrays = [projected_batch.column(idx) for idx in range(projected_batch.num_columns)] + fields = list(projected_batch.schema) + + for field in metadata_fields: + if field.field_id == ROW_ID_FIELD_ID: + metadata_array = _row_id_array(task, file_schema, file_batch, positions) + elif field.field_id == LAST_UPDATED_SEQUENCE_NUMBER_FIELD_ID: + metadata_array = _last_updated_sequence_number_array(task, file_schema, file_batch) + else: + continue + + arrays.append(metadata_array) + fields.append(pa.field(field.name, pa.int64(), nullable=True)) + + return pa.RecordBatch.from_arrays(arrays, schema=pa.schema(fields)) + + def _task_to_record_batches( io: FileIO, task: FileScanTask, @@ -1644,6 +1765,9 @@ def _task_to_record_batches( file_schema = pyarrow_to_schema( physical_schema, name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version ) + data_projected_schema = _projected_data_schema(projected_schema) + metadata_fields = _projected_metadata_fields(projected_schema) + row_id_requested = any(field.field_id == ROW_ID_FIELD_ID for field in metadata_fields) # Apply column projection rules: https://iceberg.apache.org/spec/#column-projection projected_missing_fields = _get_column_projection_values( @@ -1659,13 +1783,14 @@ def _task_to_record_batches( pyarrow_filter = expression_to_pyarrow(bound_file_filter, file_schema) file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + apply_filter_after_scan = bool(positional_deletes) or row_id_requested fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, schema=physical_schema, # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first - filter=pyarrow_filter if not positional_deletes else None, + filter=pyarrow_filter if not apply_filter_after_scan else None, columns=[col.name for col in file_project_schema.columns], ) @@ -1675,34 +1800,41 @@ def _task_to_record_batches( next_index = next_index + len(batch) current_index = next_index - len(batch) current_batch = batch + positions = _int64_range(current_index, current_index + len(batch)) if row_id_requested else None if positional_deletes: # Create the mask of indices that we're interested in indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch)) current_batch = current_batch.take(indices) - if pyarrow_filter is not None: - # Temporary fix until PyArrow 21 is the minimum supported version - # (https://github.com/apache/arrow/pull/46057): RecordBatch.filter raises - # IndexError on PyArrow <21 when the result is empty; Table.filter does not. - table = pa.Table.from_batches([current_batch]) - table = table.filter(pyarrow_filter) - if table.num_rows == 0: - current_batch = current_batch.slice(0, 0) - else: - current_batch = table.combine_chunks().to_batches()[0] + if positions is not None: + positions = positions.take(indices) + + if apply_filter_after_scan and pyarrow_filter is not None: + # Temporary fix until PyArrow 21 is the minimum supported version + # (https://github.com/apache/arrow/pull/46057): RecordBatch.filter raises + # IndexError on PyArrow <21 when the result is empty; Table.filter does not. + current_batch, positions = _filter_batch_and_positions(current_batch, pyarrow_filter, positions) # skip empty batches if current_batch.num_rows == 0: continue - yield _to_requested_schema( - projected_schema, + projected_batch = _to_requested_schema( + data_projected_schema, file_project_schema, current_batch, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, projected_missing_fields=projected_missing_fields, allow_timestamp_tz_mismatch=True, ) + yield _append_row_lineage_metadata_columns( + projected_schema, + file_project_schema, + current_batch, + projected_batch, + task, + positions, + ) def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[str, list[ChunkedArray]]: diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index fd60eb8f94..9d27a099b9 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -78,6 +78,25 @@ FIELD_ID_PROP = "field-id" ICEBERG_FIELD_NAME_PROP = "iceberg-field-name" +ROW_ID_FIELD_ID = 2147483540 +ROW_ID_COLUMN_NAME = "_row_id" +ROW_ID_FIELD = NestedField(ROW_ID_FIELD_ID, ROW_ID_COLUMN_NAME, LongType(), required=False) + +LAST_UPDATED_SEQUENCE_NUMBER_FIELD_ID = 2147483539 +LAST_UPDATED_SEQUENCE_NUMBER_COLUMN_NAME = "_last_updated_sequence_number" +LAST_UPDATED_SEQUENCE_NUMBER_FIELD = NestedField( + LAST_UPDATED_SEQUENCE_NUMBER_FIELD_ID, + LAST_UPDATED_SEQUENCE_NUMBER_COLUMN_NAME, + LongType(), + required=False, +) + +RESERVED_METADATA_COLUMNS_BY_NAME = { + ROW_ID_COLUMN_NAME: ROW_ID_FIELD, + LAST_UPDATED_SEQUENCE_NUMBER_COLUMN_NAME: LAST_UPDATED_SEQUENCE_NUMBER_FIELD, +} +RESERVED_METADATA_FIELD_IDS = frozenset(field.field_id for field in RESERVED_METADATA_COLUMNS_BY_NAME.values()) + class Schema(IcebergBaseModel): """A table Schema. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b086111fdc..632ec2a779 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -41,9 +41,9 @@ manifest_evaluator, ) from pyiceberg.io import FileIO, load_file_io -from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestFile +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.partitioning import PARTITION_FIELD_ID_START, UNPARTITIONED_PARTITION_SPEC, PartitionKey, PartitionSpec -from pyiceberg.schema import Schema +from pyiceberg.schema import RESERVED_METADATA_COLUMNS_BY_NAME, Schema from pyiceberg.table.delete_file_index import DeleteFileIndex from pyiceberg.table.inspect import InspectTable from pyiceberg.table.locations import LocationProvider, load_location_provider @@ -90,7 +90,7 @@ Record, TableVersion, ) -from pyiceberg.types import strtobool +from pyiceberg.types import NestedField, strtobool from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.properties import property_as_bool @@ -110,6 +110,13 @@ ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" +_RESERVED_METADATA_COLUMNS_BY_NAME_LOWER = {name.lower(): field for name, field in RESERVED_METADATA_COLUMNS_BY_NAME.items()} + + +def _reserved_metadata_field(name: str, case_sensitive: bool) -> NestedField | None: + if case_sensitive: + return RESERVED_METADATA_COLUMNS_BY_NAME.get(name) + return _RESERVED_METADATA_COLUMNS_BY_NAME_LOWER.get(name.lower()) @dataclass() @@ -1974,10 +1981,40 @@ def projection(self) -> Schema: else: raise ValueError(f"Snapshot not found: {self.snapshot_id}") + metadata_fields: list[NestedField] = [] + metadata_field_ids: set[int] = set() + for field_name in self.selected_fields: + if metadata_field := _reserved_metadata_field(field_name, self.case_sensitive): + if metadata_field.field_id not in metadata_field_ids: + metadata_fields.append(metadata_field) + metadata_field_ids.add(metadata_field.field_id) + if "*" in self.selected_fields: - return current_schema + if not metadata_fields: + return current_schema + return Schema( + *current_schema.fields, + *metadata_fields, + schema_id=current_schema.schema_id, + identifier_field_ids=current_schema.identifier_field_ids, + ) + + selected_data_fields = tuple( + field_name + for field_name in self.selected_fields + if _reserved_metadata_field(field_name, self.case_sensitive) is None + ) + if selected_data_fields: + data_projection = current_schema.select(*selected_data_fields, case_sensitive=self.case_sensitive) + else: + data_projection = Schema(schema_id=current_schema.schema_id) - return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + return Schema( + *data_projection.fields, + *metadata_fields, + schema_id=data_projection.schema_id, + identifier_field_ids=data_projection.identifier_field_ids, + ) def use_ref(self: S, name: str) -> S: if self.snapshot_id: @@ -2002,16 +2039,19 @@ class FileScanTask(ScanTask): file: DataFile delete_files: set[DataFile] residual: BooleanExpression + data_sequence_number: int | None def __init__( self, data_file: DataFile, delete_files: set[DataFile] | None = None, residual: BooleanExpression = ALWAYS_TRUE, + data_sequence_number: int | None = None, ) -> None: self.file = data_file self.delete_files = delete_files or set() self.residual = residual + self.data_sequence_number = data_sequence_number @staticmethod def from_rest_response( @@ -2079,6 +2119,8 @@ def _rest_file_to_data_file(rest_file: RESTContentFile) -> DataFile: sort_order_id=rest_file.sort_order_id, ) data_file.spec_id = rest_file.spec_id + if isinstance(rest_file, RESTDataFile): + data_file.first_row_id = rest_file.first_row_id return data_file @@ -2093,10 +2135,22 @@ def _open_manifest( Returns: A list of ManifestEntry that matches the provided filters. """ + entries = list(manifest.fetch_manifest_entry(io, discard_deleted=False)) + if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None: + next_row_id = manifest.first_row_id + for manifest_entry in entries: + data_file = manifest_entry.data_file + if data_file.content != DataFileContent.DATA: + continue + if data_file.first_row_id is None: + data_file.first_row_id = next_row_id + next_row_id += data_file.record_count return [ manifest_entry - for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=True) - if partition_filter(manifest_entry.data_file) and metrics_evaluator(manifest_entry.data_file) + for manifest_entry in entries + if manifest_entry.status != ManifestEntryStatus.DELETED + and partition_filter(manifest_entry.data_file) + and metrics_evaluator(manifest_entry.data_file) ] @@ -2236,7 +2290,7 @@ def to_arrow_batch_reader(self, dictionary_columns: tuple[str, ...] = ()) -> pa. from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow - target_schema = schema_to_pyarrow(self.projection()) + target_schema = schema_to_pyarrow(self.projection(), include_field_ids=False) batches = ArrowScan( self.table_metadata, self.io, @@ -2378,6 +2432,7 @@ def plan_files(self, manifests: Iterable[ManifestFile]) -> Iterable[FileScanTask residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( data_entry.data_file.partition ), + data_sequence_number=data_entry.sequence_number, ) for data_entry in data_entries ] diff --git a/tests/table/test_v3_row_id_read.py b/tests/table/test_v3_row_id_read.py new file mode 100644 index 0000000000..36a3ae40d5 --- /dev/null +++ b/tests/table/test_v3_row_id_read.py @@ -0,0 +1,218 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pathlib import Path + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.expressions import GreaterThan +from pyiceberg.expressions.visitors import bind +from pyiceberg.io.pyarrow import PYARROW_PARQUET_FIELD_ID_KEY, PyArrowFileIO, _task_to_record_batches +from pyiceberg.manifest import DataFile, DataFileContent, FileFormat, ManifestContent, ManifestEntry, ManifestEntryStatus +from pyiceberg.schema import ROW_ID_FIELD, ROW_ID_FIELD_ID, Schema +from pyiceberg.table import FileScanTask, _open_manifest +from pyiceberg.types import IntegerType, NestedField, StringType + +SCHEMA = Schema( + NestedField(field_id=1, name="id", field_type=IntegerType(), required=False), + NestedField(field_id=2, name="name", field_type=StringType(), required=False), +) + +ARROW_SCHEMA = pa.schema([ + pa.field("id", pa.int32(), nullable=True), + pa.field("name", pa.string(), nullable=True), +]) + + +def _batch(ids: list[int]) -> pa.Table: + return pa.Table.from_pylist( + [{"id": row_id, "name": f"row-{row_id}"} for row_id in ids], + schema=ARROW_SCHEMA, + ) + + +def _create_table(tmp_path: Path, format_version: str = "3"): + catalog = InMemoryCatalog(f"row-id-read-{format_version}", warehouse=f"file://{tmp_path}") + catalog.create_namespace("ns") + table = catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": format_version}) + return catalog, table + + +def _data_sequence_numbers(table) -> set[int]: + snapshot = table.metadata.current_snapshot() + assert snapshot is not None + + sequence_numbers: set[int] = set() + for manifest in snapshot.manifests(table.io): + if manifest.content != ManifestContent.DATA: + continue + for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=True): + assert entry.sequence_number is not None + sequence_numbers.add(entry.sequence_number) + + return sequence_numbers + + +def _manifest_entry(status: ManifestEntryStatus, file_path: str, record_count: int) -> ManifestEntry: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=record_count, + file_size_in_bytes=1, + ) + return ManifestEntry.from_args(status=status, data_file=data_file) + + +def test_v3_scan_select_row_id(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path) + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + + result = table.scan().select("_row_id").to_arrow() + + assert result.column_names == ["_row_id"] + assert result.column("_row_id").to_pylist() == [0, 1, 2] + + +def test_open_manifest_row_id_inheritance_counts_deleted_null_first_row_id_entries() -> None: + deleted_entry = _manifest_entry(ManifestEntryStatus.DELETED, "deleted.parquet", 5) + live_entry = _manifest_entry(ManifestEntryStatus.ADDED, "live.parquet", 2) + + class ManifestWithDeletedEntry: + content = ManifestContent.DATA + first_row_id = 100 + + def __init__(self) -> None: + self.discard_deleted: bool | None = None + + def fetch_manifest_entry(self, io, discard_deleted: bool = True): + self.discard_deleted = discard_deleted + entries = [deleted_entry, live_entry] + if discard_deleted: + return [entry for entry in entries if entry.status != ManifestEntryStatus.DELETED] + return entries + + manifest = ManifestWithDeletedEntry() + + entries = _open_manifest(object(), manifest, lambda _: True, lambda _: True) + + assert manifest.discard_deleted is False + assert [entry.data_file.file_path for entry in entries] == ["live.parquet"] + assert deleted_entry.data_file.first_row_id == 100 + assert live_entry.data_file.first_row_id == 105 + + +def test_v3_scan_row_ids_continue_across_appends(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path) + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + table.append(_batch([4, 5])) + table = catalog.load_table("ns.t") + + rows = table.scan().select("id", "_row_id").to_arrow().to_pylist() + + assert {row["id"]: row["_row_id"] for row in rows} == {1: 0, 2: 1, 3: 2, 4: 3, 5: 4} + + +def test_v3_scan_projects_row_id_alongside_real_columns(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path) + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + + plain = table.scan().select("id", "name").to_arrow() + with_row_id = table.scan().select("id", "name", "_row_id").to_arrow() + + assert with_row_id.column_names == ["id", "name", "_row_id"] + assert with_row_id.select(["id", "name"]).to_pylist() == plain.to_pylist() + assert with_row_id.column("_row_id").to_pylist() == [0, 1, 2] + + +def test_v3_scan_select_last_updated_sequence_number(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path) + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + + sequence_numbers = _data_sequence_numbers(table) + assert len(sequence_numbers) == 1 + data_sequence_number = next(iter(sequence_numbers)) + + result = table.scan().select("_last_updated_sequence_number").to_arrow() + + assert result.column_names == ["_last_updated_sequence_number"] + assert result.column("_last_updated_sequence_number").to_pylist() == [data_sequence_number] * 3 + + +def test_v2_scan_select_row_id_raises(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path, format_version="2") + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + + with pytest.raises(ValueError, match="row lineage requires a v3 data file with first_row_id"): + table.scan().select("_row_id").to_arrow() + + +def test_v3_select_star_does_not_include_row_id(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path) + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + + result = table.scan().select("*").to_arrow() + + assert result.column_names == ["id", "name"] + + +def test_row_id_positions_survive_positional_deletes_and_filter(tmp_path: Path) -> None: + arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) + arrow_table = pa.table([pa.array([1, 2, 3, 4, 5], type=pa.int32())], schema=arrow_schema) + file_path = str(tmp_path / "row-id-positional-filter.parquet") + with pq.ParquetWriter(file_path, arrow_schema) as writer: + writer.write_table(arrow_table) + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=len(arrow_table), + file_size_in_bytes=22, + ) + data_file.first_row_id = 10 + + table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) + projected_schema = Schema(NestedField(1, "id", IntegerType(), required=False), ROW_ID_FIELD) + positional_deletes = [pa.chunked_array([pa.array([1, 3], type=pa.int64())])] + + batches = list( + _task_to_record_batches( + PyArrowFileIO(), + FileScanTask(data_file, data_sequence_number=7), + bound_row_filter=bind(table_schema, GreaterThan("id", 2), case_sensitive=True), + projected_schema=projected_schema, + table_schema=table_schema, + projected_field_ids={1, ROW_ID_FIELD_ID}, + positional_deletes=positional_deletes, + case_sensitive=True, + ) + ) + + assert len(batches) == 1 + assert batches[0].to_pydict() == {"id": [3, 5], "_row_id": [12, 14]} From cedbefc0294764a75fa8d1e9d3bb6a7430891533 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 21:52:39 -0500 Subject: [PATCH 5/6] v3: read _row_id as null when first_row_id is null; gate lineage columns to v3+ --- pyiceberg/io/pyarrow.py | 8 +++-- pyiceberg/table/__init__.py | 6 ++++ tests/table/test_v3_row_id_read.py | 51 ++++++++++++++++++++++++++++-- 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 5ccddafc28..6c2cdf253d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1670,7 +1670,9 @@ def _filter_batch_and_positions( def _row_id_array(task: FileScanTask, file_schema: Schema, batch: pa.RecordBatch, positions: pa.Array | None) -> pa.Array: if task.file.first_row_id is None: - raise ValueError("Cannot read _row_id: row lineage requires a v3 data file with first_row_id") + # Snapshots written before row lineage was enabled (e.g. pre-upgrade snapshots of an + # upgraded table) have a null first_row_id, so _row_id reads as null for all rows. + return pa.nulls(batch.num_rows, type=pa.int64()) if positions is None: raise ValueError("Cannot read _row_id: row positions were not materialized") @@ -1688,7 +1690,9 @@ def _last_updated_sequence_number_array(task: FileScanTask, file_schema: Schema, if task.data_sequence_number is None: if physical_sequence_numbers is None: raise ValueError( - "Cannot read _last_updated_sequence_number: data sequence number is missing from the file scan task" + "Cannot read _last_updated_sequence_number: the file scan task has no data sequence number. " + "Server-side/REST scan planning does not yet supply the data sequence number required to " + "materialize this column." ) physical_sequence_numbers = _as_int64_array(physical_sequence_numbers) if physical_sequence_numbers.null_count > 0: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 632ec2a779..7b6e74768b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1989,6 +1989,12 @@ def projection(self) -> Schema: metadata_fields.append(metadata_field) metadata_field_ids.add(metadata_field.field_id) + if metadata_fields and self.table_metadata.format_version < 3: + raise ValueError( + "Row lineage metadata columns (_row_id, _last_updated_sequence_number) are only available for v3+ " + f"tables; this table is format-version {self.table_metadata.format_version}." + ) + if "*" in self.selected_fields: if not metadata_fields: return current_schema diff --git a/tests/table/test_v3_row_id_read.py b/tests/table/test_v3_row_id_read.py index 36a3ae40d5..70853e1fdb 100644 --- a/tests/table/test_v3_row_id_read.py +++ b/tests/table/test_v3_row_id_read.py @@ -22,7 +22,7 @@ import pytest from pyiceberg.catalog.memory import InMemoryCatalog -from pyiceberg.expressions import GreaterThan +from pyiceberg.expressions import AlwaysTrue, GreaterThan from pyiceberg.expressions.visitors import bind from pyiceberg.io.pyarrow import PYARROW_PARQUET_FIELD_ID_KEY, PyArrowFileIO, _task_to_record_batches from pyiceberg.manifest import DataFile, DataFileContent, FileFormat, ManifestContent, ManifestEntry, ManifestEntryStatus @@ -166,10 +166,19 @@ def test_v2_scan_select_row_id_raises(tmp_path: Path) -> None: table.append(_batch([1, 2, 3])) table = catalog.load_table("ns.t") - with pytest.raises(ValueError, match="row lineage requires a v3 data file with first_row_id"): + with pytest.raises(ValueError, match="only available for v3"): table.scan().select("_row_id").to_arrow() +def test_v2_scan_select_last_updated_sequence_number_raises(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path, format_version="2") + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + + with pytest.raises(ValueError, match="only available for v3"): + table.scan().select("_last_updated_sequence_number").to_arrow() + + def test_v3_select_star_does_not_include_row_id(tmp_path: Path) -> None: catalog, table = _create_table(tmp_path) table.append(_batch([1, 2, 3])) @@ -216,3 +225,41 @@ def test_row_id_positions_survive_positional_deletes_and_filter(tmp_path: Path) assert len(batches) == 1 assert batches[0].to_pydict() == {"id": [3, 5], "_row_id": [12, 14]} + + +def test_row_id_null_when_first_row_id_missing(tmp_path: Path) -> None: + arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) + arrow_table = pa.table([pa.array([1, 2, 3, 4, 5], type=pa.int32())], schema=arrow_schema) + file_path = str(tmp_path / "row-id-null-first-row-id.parquet") + with pq.ParquetWriter(file_path, arrow_schema) as writer: + writer.write_table(arrow_table) + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=len(arrow_table), + file_size_in_bytes=22, + ) + # Pre-upgrade snapshots have a null first_row_id; _row_id must read as null per the v3 spec. + data_file.first_row_id = None + + table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) + projected_schema = Schema(NestedField(1, "id", IntegerType(), required=False), ROW_ID_FIELD) + + batches = list( + _task_to_record_batches( + PyArrowFileIO(), + FileScanTask(data_file, data_sequence_number=7), + bound_row_filter=AlwaysTrue(), + projected_schema=projected_schema, + table_schema=table_schema, + projected_field_ids={1, ROW_ID_FIELD_ID}, + positional_deletes=None, + case_sensitive=True, + ) + ) + + assert len(batches) == 1 + assert batches[0].to_pydict() == {"id": [1, 2, 3, 4, 5], "_row_id": [None, None, None, None, None]} From 345dbfd4046cb903ba14ab3021cd0860ea691b7f Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Wed, 24 Jun 2026 08:24:24 -0500 Subject: [PATCH 6/6] chore: satisfy mypy/ruff on v3 row-id read paths and tests --- pyiceberg/manifest.py | 2 +- pyiceberg/table/__init__.py | 4 +--- tests/table/test_init.py | 4 +++- tests/table/test_v3_row_id_read.py | 32 ++++++++++++++++++++---------- tests/table/test_v3_row_lineage.py | 19 +++++++++--------- 5 files changed, 37 insertions(+), 24 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 04b24a4845..e630ae6535 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -893,7 +893,7 @@ def first_row_id(self) -> int | None: return self._data[15] if len(self._data) > 15 else None @first_row_id.setter - def first_row_id(self, value: int) -> None: + def first_row_id(self, value: int | None) -> None: if len(self._data) <= 15: self._data.append(value) else: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 7b6e74768b..4f143ec53b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2006,9 +2006,7 @@ def projection(self) -> Schema: ) selected_data_fields = tuple( - field_name - for field_name in self.selected_fields - if _reserved_metadata_field(field_name, self.case_sensitive) is None + field_name for field_name in self.selected_fields if _reserved_metadata_field(field_name, self.case_sensitive) is None ) if selected_data_fields: data_projection = current_schema.select(*selected_data_fields, case_sensitive=self.case_sensitive) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index d4f5195d68..e789ea98f0 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1764,7 +1764,9 @@ def test_add_snapshot_update_fails_with_null_table_next_row_id(table_v3: Table) ValueError, match="Cannot add a snapshot when table next-row-id is null", ): - update_table_metadata(table_v3.metadata.model_copy(update={"next_row_id": None}), (AddSnapshotUpdate(snapshot=new_snapshot),)) + update_table_metadata( + table_v3.metadata.model_copy(update={"next_row_id": None}), (AddSnapshotUpdate(snapshot=new_snapshot),) + ) def test_add_snapshot_update_updates_next_row_id(table_v3: Table) -> None: diff --git a/tests/table/test_v3_row_id_read.py b/tests/table/test_v3_row_id_read.py index 70853e1fdb..7455fbfb2b 100644 --- a/tests/table/test_v3_row_id_read.py +++ b/tests/table/test_v3_row_id_read.py @@ -16,6 +16,7 @@ # under the License. from pathlib import Path +from typing import cast import pyarrow as pa import pyarrow.parquet as pq @@ -24,10 +25,19 @@ from pyiceberg.catalog.memory import InMemoryCatalog from pyiceberg.expressions import AlwaysTrue, GreaterThan from pyiceberg.expressions.visitors import bind +from pyiceberg.io import FileIO from pyiceberg.io.pyarrow import PYARROW_PARQUET_FIELD_ID_KEY, PyArrowFileIO, _task_to_record_batches -from pyiceberg.manifest import DataFile, DataFileContent, FileFormat, ManifestContent, ManifestEntry, ManifestEntryStatus +from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ManifestContent, + ManifestEntry, + ManifestEntryStatus, + ManifestFile, +) from pyiceberg.schema import ROW_ID_FIELD, ROW_ID_FIELD_ID, Schema -from pyiceberg.table import FileScanTask, _open_manifest +from pyiceberg.table import FileScanTask, Table, _open_manifest from pyiceberg.types import IntegerType, NestedField, StringType SCHEMA = Schema( @@ -35,10 +45,12 @@ NestedField(field_id=2, name="name", field_type=StringType(), required=False), ) -ARROW_SCHEMA = pa.schema([ - pa.field("id", pa.int32(), nullable=True), - pa.field("name", pa.string(), nullable=True), -]) +ARROW_SCHEMA = pa.schema( + [ + pa.field("id", pa.int32(), nullable=True), + pa.field("name", pa.string(), nullable=True), + ] +) def _batch(ids: list[int]) -> pa.Table: @@ -48,14 +60,14 @@ def _batch(ids: list[int]) -> pa.Table: ) -def _create_table(tmp_path: Path, format_version: str = "3"): +def _create_table(tmp_path: Path, format_version: str = "3") -> tuple[InMemoryCatalog, Table]: catalog = InMemoryCatalog(f"row-id-read-{format_version}", warehouse=f"file://{tmp_path}") catalog.create_namespace("ns") table = catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": format_version}) return catalog, table -def _data_sequence_numbers(table) -> set[int]: +def _data_sequence_numbers(table: Table) -> set[int]: snapshot = table.metadata.current_snapshot() assert snapshot is not None @@ -104,7 +116,7 @@ class ManifestWithDeletedEntry: def __init__(self) -> None: self.discard_deleted: bool | None = None - def fetch_manifest_entry(self, io, discard_deleted: bool = True): + def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: self.discard_deleted = discard_deleted entries = [deleted_entry, live_entry] if discard_deleted: @@ -113,7 +125,7 @@ def fetch_manifest_entry(self, io, discard_deleted: bool = True): manifest = ManifestWithDeletedEntry() - entries = _open_manifest(object(), manifest, lambda _: True, lambda _: True) + entries = _open_manifest(cast(FileIO, object()), cast(ManifestFile, manifest), lambda _: True, lambda _: True) assert manifest.discard_deleted is False assert [entry.data_file.file_path for entry in entries] == ["live.parquet"] diff --git a/tests/table/test_v3_row_lineage.py b/tests/table/test_v3_row_lineage.py index 6286bb9dbe..f6abba42b1 100644 --- a/tests/table/test_v3_row_lineage.py +++ b/tests/table/test_v3_row_lineage.py @@ -24,6 +24,7 @@ import json from pathlib import Path +from typing import cast import pyarrow as pa import pytest @@ -46,10 +47,12 @@ def v3_catalog(tmp_path: Path) -> Catalog: NestedField(field_id=2, name="name", field_type=StringType(), required=False), ) -ARROW_SCHEMA = pa.schema([ - pa.field("id", pa.int32(), nullable=True), - pa.field("name", pa.string(), nullable=True), -]) +ARROW_SCHEMA = pa.schema( + [ + pa.field("id", pa.int32(), nullable=True), + pa.field("name", pa.string(), nullable=True), + ] +) def _batch(ids: list[int]) -> pa.Table: @@ -104,9 +107,7 @@ def test_v3_append_twice_row_lineage_is_monotonic(v3_catalog: Catalog) -> None: assert running == tbl.metadata.next_row_id -def test_v3_merge_append_does_not_double_count_existing_rows( - v3_catalog: Catalog, monkeypatch: pytest.MonkeyPatch -) -> None: +def test_v3_merge_append_does_not_double_count_existing_rows(v3_catalog: Catalog, monkeypatch: pytest.MonkeyPatch) -> None: """Merge-append must actually MERGE manifests and must NOT double-count existing rows. This is the #3070 double-count regression guard. We instrument the merge manager so the @@ -159,7 +160,7 @@ def _counting_create(self, spec_id, manifest_bin): # type: ignore[no-untyped-de # The merged manifests must tile [0, 9) exactly, with each data file's row range coherent. assigned = sorted( - ((m.first_row_id, m.existing_rows_count + m.added_rows_count) for m in data_manifests), + ((cast(int, m.first_row_id), cast(int, m.existing_rows_count) + cast(int, m.added_rows_count)) for m in data_manifests), key=lambda pair: pair[0], ) cursor = 0 @@ -285,7 +286,7 @@ def test_v3_whole_file_delete_with_two_survivors_renumbers_none(v3_catalog: Cata tbl = v3_catalog.load_table("ns.t") assert tbl.metadata.next_row_id == 6, "no new row ids on a whole-file delete" - frids = sorted(triple[0] for triple in _data_file_row_ids(tbl)) + frids = sorted(cast(int, triple[0]) for triple in _data_file_row_ids(tbl)) # survivors must keep first_row_ids 0 and 4 (NOT renumbered to 0 and 2). assert frids == [0, 4], "survivors were re-numbered after deleting the middle file" actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist())