diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 076098c757..6c2cdf253d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -129,6 +129,9 @@ ) from pyiceberg.partitioning import PartitionFieldValue, PartitionKey, PartitionSpec from pyiceberg.schema import ( + LAST_UPDATED_SEQUENCE_NUMBER_FIELD_ID, + RESERVED_METADATA_FIELD_IDS, + ROW_ID_FIELD_ID, PartnerAccessor, PreOrderSchemaVisitor, Schema, @@ -1612,6 +1615,128 @@ def _get_column_projection_values( return projected_missing_fields +def _projected_data_schema(projected_schema: Schema) -> Schema: + return Schema( + *(field for field in projected_schema.fields if field.field_id not in RESERVED_METADATA_FIELD_IDS), + schema_id=projected_schema.schema_id, + identifier_field_ids=projected_schema.identifier_field_ids, + ) + + +def _projected_metadata_fields(projected_schema: Schema) -> tuple[NestedField, ...]: + return tuple(field for field in projected_schema.fields if field.field_id in RESERVED_METADATA_FIELD_IDS) + + +def _column_by_field_id(file_schema: Schema, batch: pa.RecordBatch, field_id: int) -> pa.Array | None: + for idx, field in enumerate(file_schema.fields): + if field.field_id == field_id: + return batch.column(idx) + return None + + +def _as_int64_array(array: pa.Array | pa.ChunkedArray) -> pa.Array: + if isinstance(array, pa.ChunkedArray): + array = array.combine_chunks() + if array.type != pa.int64(): + return array.cast(pa.int64()) + return array + + +def _int64_range(start: int, stop: int) -> pa.Array: + return pa.array(range(start, stop), type=pa.int64()) + + +def _filter_batch_and_positions( + batch: pa.RecordBatch, pyarrow_filter: ds.Expression, positions: pa.Array | None +) -> tuple[pa.RecordBatch, pa.Array | None]: + position_column_name = "__iceberg_row_position" + while position_column_name in batch.schema.names: + position_column_name = f"{position_column_name}_" + + table = pa.Table.from_batches([batch]) + if positions is not None: + table = table.append_column(position_column_name, positions) + + table = table.filter(pyarrow_filter) + if table.num_rows == 0: + return batch.slice(0, 0), pa.array([], type=pa.int64()) if positions is not None else None + + if positions is not None: + positions = _as_int64_array(table.column(position_column_name)) + table = table.drop([position_column_name]) + + return table.combine_chunks().to_batches()[0], positions + + +def _row_id_array(task: FileScanTask, file_schema: Schema, batch: pa.RecordBatch, positions: pa.Array | None) -> pa.Array: + if task.file.first_row_id is None: + # Snapshots written before row lineage was enabled (e.g. pre-upgrade snapshots of an + # upgraded table) have a null first_row_id, so _row_id reads as null for all rows. + return pa.nulls(batch.num_rows, type=pa.int64()) + if positions is None: + raise ValueError("Cannot read _row_id: row positions were not materialized") + + computed_row_ids = pc.add(positions, pa.scalar(task.file.first_row_id, type=pa.int64())) + physical_row_ids = _column_by_field_id(file_schema, batch, ROW_ID_FIELD_ID) + if physical_row_ids is None: + return computed_row_ids + + return pc.coalesce(_as_int64_array(physical_row_ids), computed_row_ids) + + +def _last_updated_sequence_number_array(task: FileScanTask, file_schema: Schema, batch: pa.RecordBatch) -> pa.Array: + physical_sequence_numbers = _column_by_field_id(file_schema, batch, LAST_UPDATED_SEQUENCE_NUMBER_FIELD_ID) + + if task.data_sequence_number is None: + if physical_sequence_numbers is None: + raise ValueError( + "Cannot read _last_updated_sequence_number: the file scan task has no data sequence number. " + "Server-side/REST scan planning does not yet supply the data sequence number required to " + "materialize this column." + ) + physical_sequence_numbers = _as_int64_array(physical_sequence_numbers) + if physical_sequence_numbers.null_count > 0: + raise ValueError( + "Cannot read _last_updated_sequence_number: data sequence number is required for null physical values" + ) + return physical_sequence_numbers + + fallback_sequence_numbers = pa.repeat(pa.scalar(task.data_sequence_number, type=pa.int64()), batch.num_rows) + if physical_sequence_numbers is None: + return fallback_sequence_numbers + + return pc.coalesce(_as_int64_array(physical_sequence_numbers), fallback_sequence_numbers) + + +def _append_row_lineage_metadata_columns( + projected_schema: Schema, + file_schema: Schema, + file_batch: pa.RecordBatch, + projected_batch: pa.RecordBatch, + task: FileScanTask, + positions: pa.Array | None, +) -> pa.RecordBatch: + metadata_fields = _projected_metadata_fields(projected_schema) + if not metadata_fields: + return projected_batch + + arrays = [projected_batch.column(idx) for idx in range(projected_batch.num_columns)] + fields = list(projected_batch.schema) + + for field in metadata_fields: + if field.field_id == ROW_ID_FIELD_ID: + metadata_array = _row_id_array(task, file_schema, file_batch, positions) + elif field.field_id == LAST_UPDATED_SEQUENCE_NUMBER_FIELD_ID: + metadata_array = _last_updated_sequence_number_array(task, file_schema, file_batch) + else: + continue + + arrays.append(metadata_array) + fields.append(pa.field(field.name, pa.int64(), nullable=True)) + + return pa.RecordBatch.from_arrays(arrays, schema=pa.schema(fields)) + + def _task_to_record_batches( io: FileIO, task: FileScanTask, @@ -1644,6 +1769,9 @@ def _task_to_record_batches( file_schema = pyarrow_to_schema( physical_schema, name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version ) + data_projected_schema = _projected_data_schema(projected_schema) + metadata_fields = _projected_metadata_fields(projected_schema) + row_id_requested = any(field.field_id == ROW_ID_FIELD_ID for field in metadata_fields) # Apply column projection rules: https://iceberg.apache.org/spec/#column-projection projected_missing_fields = _get_column_projection_values( @@ -1659,13 +1787,14 @@ def _task_to_record_batches( pyarrow_filter = expression_to_pyarrow(bound_file_filter, file_schema) file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + apply_filter_after_scan = bool(positional_deletes) or row_id_requested fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, schema=physical_schema, # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first - filter=pyarrow_filter if not positional_deletes else None, + filter=pyarrow_filter if not apply_filter_after_scan else None, columns=[col.name for col in file_project_schema.columns], ) @@ -1675,34 +1804,41 @@ def _task_to_record_batches( next_index = next_index + len(batch) current_index = next_index - len(batch) current_batch = batch + positions = _int64_range(current_index, current_index + len(batch)) if row_id_requested else None if positional_deletes: # Create the mask of indices that we're interested in indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch)) current_batch = current_batch.take(indices) - if pyarrow_filter is not None: - # Temporary fix until PyArrow 21 is the minimum supported version - # (https://github.com/apache/arrow/pull/46057): RecordBatch.filter raises - # IndexError on PyArrow <21 when the result is empty; Table.filter does not. - table = pa.Table.from_batches([current_batch]) - table = table.filter(pyarrow_filter) - if table.num_rows == 0: - current_batch = current_batch.slice(0, 0) - else: - current_batch = table.combine_chunks().to_batches()[0] + if positions is not None: + positions = positions.take(indices) + + if apply_filter_after_scan and pyarrow_filter is not None: + # Temporary fix until PyArrow 21 is the minimum supported version + # (https://github.com/apache/arrow/pull/46057): RecordBatch.filter raises + # IndexError on PyArrow <21 when the result is empty; Table.filter does not. + current_batch, positions = _filter_batch_and_positions(current_batch, pyarrow_filter, positions) # skip empty batches if current_batch.num_rows == 0: continue - yield _to_requested_schema( - projected_schema, + projected_batch = _to_requested_schema( + data_projected_schema, file_project_schema, current_batch, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, projected_missing_fields=projected_missing_fields, allow_timestamp_tz_mismatch=True, ) + yield _append_row_lineage_metadata_columns( + projected_schema, + file_project_schema, + current_batch, + projected_batch, + task, + positions, + ) def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[str, list[ChunkedArray]]: diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 3811a9d894..e630ae6535 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -54,7 +54,7 @@ UNASSIGNED_SEQ = -1 DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 -DEFAULT_READ_VERSION: Literal[2] = 2 +DEFAULT_READ_VERSION: Literal[3] = 3 INITIAL_SEQUENCE_NUMBER = 0 @@ -531,6 +531,25 @@ def equality_ids(self) -> list[int] | None: def sort_order_id(self) -> int | None: return self._data[15] + @property + def first_row_id(self) -> int | None: + """The _row_id for the first row in the data file (field 142, v3+ only). + + Note: ``DataFile.from_args`` always allocates the full v3-width record + (``len == 20``) regardless of table version, so a v1/v2 file simply holds + ``None`` at position 16. The length guard below only matters for records + constructed directly from a shorter positional list. + """ + if len(self._data) > 16: + return self._data[16] + return None + + @first_row_id.setter + def first_row_id(self, value: int | None) -> None: + if len(self._data) <= 16: + raise ValueError("Cannot set first_row_id: record has no field-142 slot") + self._data[16] = value + # Spec ID should not be stored in the file _spec_id: int @@ -549,6 +568,23 @@ def __setattr__(self, name: str, value: Any) -> None: value = FileFormat[value] super().__setattr__(name, value) + def __copy__(self) -> DataFile: + """Return a copy whose ``_data`` list is independent from the original. + + ``Record`` stores its fields in a mutable ``_data`` list, so the default + ``copy.copy`` would share that list and a mutation on the copy (e.g. + assigning ``first_row_id`` while materializing row lineage on a delete) + would leak back into the source DataFile. Copying the list isolates + field-level mutations to the copy while preserving ``_spec_id``. + """ + cls = self.__class__ + new = cls.__new__(cls) + new._data = list(self._data) + instance_dict = getattr(self, "__dict__", None) + if instance_dict: + new.__dict__.update(instance_dict) + return new + def __hash__(self) -> int: """Return the hash of the file path.""" return hash(self.file_path) @@ -852,6 +888,17 @@ def partitions(self) -> list[PartitionFieldSummary] | None: def key_metadata(self) -> bytes | None: return self._data[14] + @property + def first_row_id(self) -> int | None: + return self._data[15] if len(self._data) > 15 else None + + @first_row_id.setter + def first_row_id(self, value: int | None) -> None: + if len(self._data) <= 15: + self._data.append(value) + else: + self._data[15] = value + def has_added_files(self) -> bool: return self.added_files_count is None or self.added_files_count > 0 @@ -1240,6 +1287,12 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: return entry +class ManifestWriterV3(ManifestWriterV2): + @property + def version(self) -> TableVersion: + return 3 + + def write_manifest( format_version: TableVersion, spec: PartitionSpec, @@ -1252,6 +1305,8 @@ def write_manifest( return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression) elif format_version == 2: return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression) + elif format_version == 3: + return ManifestWriterV3(spec, schema, output_file, snapshot_id, avro_compression) else: raise ValueError(f"Cannot write manifest for table version: {format_version}") @@ -1295,6 +1350,10 @@ def __exit__( @abstractmethod def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ... + @property + def next_row_id(self) -> int | None: + return None + def add_manifests(self, manifest_files: list[ManifestFile]) -> ManifestListWriter: self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files]) return self @@ -1351,9 +1410,7 @@ def __init__( self._commit_snapshot_id = snapshot_id self._sequence_number = sequence_number - def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: - wrapped_manifest_file = copy(manifest_file) - + def _prepare_manifest_for_commit(self, wrapped_manifest_file: ManifestFile) -> ManifestFile: if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ: # if the sequence number is being assigned here, then the manifest must be created by the current operation. # To validate this, check that the snapshot id matches the current commit @@ -1374,6 +1431,56 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: wrapped_manifest_file.min_sequence_number = self._sequence_number return wrapped_manifest_file + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + return self._prepare_manifest_for_commit(copy(manifest_file)) + + +class ManifestListWriterV3(ManifestListWriterV2): + _next_row_id: int + + def __init__( + self, + output_file: OutputFile, + snapshot_id: int, + parent_snapshot_id: int | None, + sequence_number: int, + snapshot_first_row_id: int, + compression: AvroCompressionCodec, + ): + super().__init__( + output_file=output_file, + snapshot_id=snapshot_id, + parent_snapshot_id=parent_snapshot_id, + sequence_number=sequence_number, + compression=compression, + ) + self._format_version = 3 + self._meta = { + "snapshot-id": str(snapshot_id), + "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null", + "sequence-number": str(sequence_number), + "first-row-id": str(snapshot_first_row_id), + "format-version": "3", + AVRO_CODEC_KEY: compression, + } + self._next_row_id = snapshot_first_row_id + + @property + def next_row_id(self) -> int | None: + return self._next_row_id + + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + wrapped = self._prepare_manifest_for_commit(copy(manifest_file)) + if wrapped.content == ManifestContent.DATA and wrapped.first_row_id is None: + if wrapped.existing_rows_count is None or wrapped.added_rows_count is None: + raise ValueError( + "Cannot assign first row id for a v3 manifest without existing-rows-count and added-rows-count: " + + wrapped.manifest_path + ) + wrapped.first_row_id = self._next_row_id + self._next_row_id += wrapped.existing_rows_count + wrapped.added_rows_count + return wrapped + def write_manifest_list( format_version: TableVersion, @@ -1382,6 +1489,7 @@ def write_manifest_list( parent_snapshot_id: int | None, sequence_number: int | None, avro_compression: AvroCompressionCodec, + snapshot_first_row_id: int | None = None, ) -> ManifestListWriter: if format_version == 1: return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression) @@ -1389,5 +1497,18 @@ def write_manifest_list( if sequence_number is None: raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}") return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression) + elif format_version == 3: + if sequence_number is None: + raise ValueError(f"Sequence-number is required for V3 tables: {sequence_number}") + if snapshot_first_row_id is None: + raise ValueError(f"Snapshot-first-row-id is required for V3 tables: {snapshot_first_row_id}") + return ManifestListWriterV3( + output_file=output_file, + snapshot_id=snapshot_id, + parent_snapshot_id=parent_snapshot_id, + sequence_number=sequence_number, + snapshot_first_row_id=snapshot_first_row_id, + compression=avro_compression, + ) else: raise ValueError(f"Cannot write manifest list for table version: {format_version}") diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index fd60eb8f94..9d27a099b9 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -78,6 +78,25 @@ FIELD_ID_PROP = "field-id" ICEBERG_FIELD_NAME_PROP = "iceberg-field-name" +ROW_ID_FIELD_ID = 2147483540 +ROW_ID_COLUMN_NAME = "_row_id" +ROW_ID_FIELD = NestedField(ROW_ID_FIELD_ID, ROW_ID_COLUMN_NAME, LongType(), required=False) + +LAST_UPDATED_SEQUENCE_NUMBER_FIELD_ID = 2147483539 +LAST_UPDATED_SEQUENCE_NUMBER_COLUMN_NAME = "_last_updated_sequence_number" +LAST_UPDATED_SEQUENCE_NUMBER_FIELD = NestedField( + LAST_UPDATED_SEQUENCE_NUMBER_FIELD_ID, + LAST_UPDATED_SEQUENCE_NUMBER_COLUMN_NAME, + LongType(), + required=False, +) + +RESERVED_METADATA_COLUMNS_BY_NAME = { + ROW_ID_COLUMN_NAME: ROW_ID_FIELD, + LAST_UPDATED_SEQUENCE_NUMBER_COLUMN_NAME: LAST_UPDATED_SEQUENCE_NUMBER_FIELD, +} +RESERVED_METADATA_FIELD_IDS = frozenset(field.field_id for field in RESERVED_METADATA_COLUMNS_BY_NAME.values()) + class Schema(IcebergBaseModel): """A table Schema. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 597f62632f..4f143ec53b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -41,14 +41,14 @@ manifest_evaluator, ) from pyiceberg.io import FileIO, load_file_io -from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestFile +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.partitioning import PARTITION_FIELD_ID_START, UNPARTITIONED_PARTITION_SPEC, PartitionKey, PartitionSpec -from pyiceberg.schema import Schema +from pyiceberg.schema import RESERVED_METADATA_COLUMNS_BY_NAME, Schema from pyiceberg.table.delete_file_index import DeleteFileIndex from pyiceberg.table.inspect import InspectTable from pyiceberg.table.locations import LocationProvider, load_location_provider from pyiceberg.table.maintenance import MaintenanceTable -from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata +from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata from pyiceberg.table.name_mapping import NameMapping from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry @@ -90,7 +90,7 @@ Record, TableVersion, ) -from pyiceberg.types import strtobool +from pyiceberg.types import NestedField, strtobool from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.properties import property_as_bool @@ -110,6 +110,13 @@ ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" +_RESERVED_METADATA_COLUMNS_BY_NAME_LOWER = {name.lower(): field for name, field in RESERVED_METADATA_COLUMNS_BY_NAME.items()} + + +def _reserved_metadata_field(name: str, case_sensitive: bool) -> NestedField | None: + if case_sensitive: + return RESERVED_METADATA_COLUMNS_BY_NAME.get(name) + return _RESERVED_METADATA_COLUMNS_BY_NAME_LOWER.get(name.lower()) @dataclass() @@ -293,7 +300,7 @@ def upgrade_table_version(self, format_version: TableVersion) -> Transaction: Returns: The alter table builder. """ - if format_version not in {1, 2}: + if not 1 <= format_version <= SUPPORTED_TABLE_FORMAT_VERSION: raise ValueError(f"Unsupported table format version: {format_version}") if format_version < self.table_metadata.format_version: @@ -730,6 +737,23 @@ def delete( # Check if there are any files that require an actual rewrite of a data file if delete_snapshot.rewrites_needed is True: + if self.table_metadata.format_version >= 3: + # A partial (copy-on-write) delete physically rewrites surviving rows into a + # NEW data file. Per the v3 spec those rows must KEEP their original _row_id + # (field 142 / the _row_id metadata column). After a partial delete the + # survivors are generally non-contiguous (e.g. 0,1,4,5), so their lineage can + # only be preserved by materializing an explicit per-row _row_id column on read + # and persisting it on rewrite. PyIceberg has no read-side _row_id + # materialization yet, so doing this rewrite would silently RE-NUMBER the + # surviving rows and corrupt row lineage. We fail loudly instead. + raise NotImplementedError( + "v3 copy-on-write delete that requires rewriting a data file is not " + "supported yet: surviving rows would lose their _row_id lineage because " + "PyIceberg cannot materialize/preserve per-row _row_id (field 142) across a " + "physical rewrite. Whole-file deletes (which drop entire data files and " + "preserve survivors' row ids) are supported. Track: read-side _row_id " + "materialization." + ) bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive) preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter, self.table_metadata.schema()) @@ -1957,10 +1981,44 @@ def projection(self) -> Schema: else: raise ValueError(f"Snapshot not found: {self.snapshot_id}") + metadata_fields: list[NestedField] = [] + metadata_field_ids: set[int] = set() + for field_name in self.selected_fields: + if metadata_field := _reserved_metadata_field(field_name, self.case_sensitive): + if metadata_field.field_id not in metadata_field_ids: + metadata_fields.append(metadata_field) + metadata_field_ids.add(metadata_field.field_id) + + if metadata_fields and self.table_metadata.format_version < 3: + raise ValueError( + "Row lineage metadata columns (_row_id, _last_updated_sequence_number) are only available for v3+ " + f"tables; this table is format-version {self.table_metadata.format_version}." + ) + if "*" in self.selected_fields: - return current_schema + if not metadata_fields: + return current_schema + return Schema( + *current_schema.fields, + *metadata_fields, + schema_id=current_schema.schema_id, + identifier_field_ids=current_schema.identifier_field_ids, + ) - return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + selected_data_fields = tuple( + field_name for field_name in self.selected_fields if _reserved_metadata_field(field_name, self.case_sensitive) is None + ) + if selected_data_fields: + data_projection = current_schema.select(*selected_data_fields, case_sensitive=self.case_sensitive) + else: + data_projection = Schema(schema_id=current_schema.schema_id) + + return Schema( + *data_projection.fields, + *metadata_fields, + schema_id=data_projection.schema_id, + identifier_field_ids=data_projection.identifier_field_ids, + ) def use_ref(self: S, name: str) -> S: if self.snapshot_id: @@ -1985,16 +2043,19 @@ class FileScanTask(ScanTask): file: DataFile delete_files: set[DataFile] residual: BooleanExpression + data_sequence_number: int | None def __init__( self, data_file: DataFile, delete_files: set[DataFile] | None = None, residual: BooleanExpression = ALWAYS_TRUE, + data_sequence_number: int | None = None, ) -> None: self.file = data_file self.delete_files = delete_files or set() self.residual = residual + self.data_sequence_number = data_sequence_number @staticmethod def from_rest_response( @@ -2062,6 +2123,8 @@ def _rest_file_to_data_file(rest_file: RESTContentFile) -> DataFile: sort_order_id=rest_file.sort_order_id, ) data_file.spec_id = rest_file.spec_id + if isinstance(rest_file, RESTDataFile): + data_file.first_row_id = rest_file.first_row_id return data_file @@ -2076,10 +2139,22 @@ def _open_manifest( Returns: A list of ManifestEntry that matches the provided filters. """ + entries = list(manifest.fetch_manifest_entry(io, discard_deleted=False)) + if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None: + next_row_id = manifest.first_row_id + for manifest_entry in entries: + data_file = manifest_entry.data_file + if data_file.content != DataFileContent.DATA: + continue + if data_file.first_row_id is None: + data_file.first_row_id = next_row_id + next_row_id += data_file.record_count return [ manifest_entry - for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=True) - if partition_filter(manifest_entry.data_file) and metrics_evaluator(manifest_entry.data_file) + for manifest_entry in entries + if manifest_entry.status != ManifestEntryStatus.DELETED + and partition_filter(manifest_entry.data_file) + and metrics_evaluator(manifest_entry.data_file) ] @@ -2219,7 +2294,7 @@ def to_arrow_batch_reader(self, dictionary_columns: tuple[str, ...] = ()) -> pa. from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow - target_schema = schema_to_pyarrow(self.projection()) + target_schema = schema_to_pyarrow(self.projection(), include_field_ids=False) batches = ArrowScan( self.table_metadata, self.io, @@ -2361,6 +2436,7 @@ def plan_files(self, manifests: Iterable[ManifestFile]) -> Iterable[FileScanTask residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( data_entry.data_file.partition ), + data_sequence_number=data_entry.sequence_number, ) for data_entry in data_entries ] diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 26b6e3d3ad..329d4c3a7a 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -66,7 +66,7 @@ INITIAL_SPEC_ID = 0 DEFAULT_SCHEMA_ID = 0 -SUPPORTED_TABLE_FORMAT_VERSION = 2 +SUPPORTED_TABLE_FORMAT_VERSION = 3 def cleanup_snapshot_id(data: dict[str, Any]) -> dict[str, Any]: @@ -574,9 +574,6 @@ def construct_refs(self) -> TableMetadata: next_row_id: int | None = Field(alias="next-row-id", default=None) """A long higher than all assigned row IDs; the next snapshot's `first-row-id`.""" - def model_dump_json(self, exclude_none: bool = True, exclude: Any | None = None, by_alias: bool = True, **kwargs: Any) -> str: - raise NotImplementedError("Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551") - TableMetadata = Annotated[TableMetadataV1 | TableMetadataV2 | TableMetadataV3, Field(discriminator="format_version")] @@ -645,6 +642,7 @@ def new_table_metadata( properties=properties, last_partition_id=fresh_partition_spec.last_assigned_field_id, table_uuid=table_uuid, + next_row_id=0, ) else: raise ValidationError(f"Unknown format version: {format_version}") diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 64838b0bd6..3da5399aed 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -29,7 +29,7 @@ from pyiceberg.exceptions import CommitFailedException from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil +from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil, TableMetadataV3 from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, @@ -322,9 +322,16 @@ def _( return base_metadata updated_metadata = base_metadata.model_copy(update={"format_version": update.format_version}) + updated_metadata = TableMetadataUtil._construct_without_validation(updated_metadata) + if ( + isinstance(updated_metadata, TableMetadataV3) + and base_metadata.format_version < 3 + and updated_metadata.next_row_id is None + ): + updated_metadata = updated_metadata.model_copy(update={"next_row_id": 0}) context.add_update(update) - return TableMetadataUtil._construct_without_validation(updated_metadata) + return updated_metadata @_apply_table_update.register(SetPropertiesUpdate) @@ -435,7 +442,7 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe elif base_metadata.snapshot_by_id(update.snapshot.snapshot_id) is not None: raise ValueError(f"Snapshot with id {update.snapshot.snapshot_id} already exists") elif ( - base_metadata.format_version == 2 + base_metadata.format_version >= 2 and update.snapshot.sequence_number is not None and update.snapshot.sequence_number <= base_metadata.last_sequence_number and update.snapshot.parent_snapshot_id is not None @@ -446,6 +453,10 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe ) elif base_metadata.format_version >= 3 and update.snapshot.first_row_id is None: raise ValueError("Cannot add snapshot without first row id") + elif base_metadata.format_version >= 3 and update.snapshot.added_rows is None: + raise ValueError("Cannot add snapshot without added rows") + elif base_metadata.format_version >= 3 and base_metadata.next_row_id is None: + raise ValueError("Cannot add a snapshot when table next-row-id is null") elif ( base_metadata.format_version >= 3 and update.snapshot.first_row_id is not None @@ -458,18 +469,19 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe ) context.add_update(update) - return base_metadata.model_copy( - update={ - "last_updated_ms": update.snapshot.timestamp_ms, - "last_sequence_number": update.snapshot.sequence_number, - "snapshots": base_metadata.snapshots + [update.snapshot], - "next_row_id": base_metadata.next_row_id + update.snapshot.added_rows - if base_metadata.format_version >= 3 - and base_metadata.next_row_id is not None - and update.snapshot.added_rows is not None - else None, - } - ) + metadata_updates: dict[str, Any] = { + "last_updated_ms": update.snapshot.timestamp_ms, + "last_sequence_number": update.snapshot.sequence_number, + "snapshots": base_metadata.snapshots + [update.snapshot], + } + if base_metadata.format_version >= 3: + if base_metadata.next_row_id is None: + raise ValueError("Cannot add a snapshot when table next-row-id is null") + if update.snapshot.added_rows is None: + raise ValueError("Cannot add snapshot without added rows") + metadata_updates["next_row_id"] = base_metadata.next_row_id + update.snapshot.added_rows + + return base_metadata.model_copy(update=metadata_updates) @_apply_table_update.register(SetSnapshotRefUpdate) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 7931edacdd..00d40d1a8e 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -21,6 +21,7 @@ from abc import abstractmethod from collections import defaultdict from collections.abc import Callable +from copy import copy from datetime import datetime from functools import cached_property from typing import TYPE_CHECKING, Generic @@ -264,8 +265,9 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: ) def _commit(self) -> UpdatesAndRequirements: + table_metadata = self._transaction.table_metadata new_manifests = self._manifests() - next_sequence_number = self._transaction.table_metadata.next_sequence_number() + next_sequence_number = table_metadata.next_sequence_number() summary = self._summary(self.snapshot_properties) file_name = _new_manifest_list_file_name( @@ -276,20 +278,31 @@ def _commit(self) -> UpdatesAndRequirements: location_provider = self._transaction._table.location_provider() manifest_list_file_path = location_provider.new_metadata_location(file_name) + if table_metadata.format_version >= 3: + snapshot_first_row_id = table_metadata.next_row_id + if snapshot_first_row_id is None: + raise ValueError("Cannot commit to a v3 table without next-row-id") + else: + snapshot_first_row_id = None + with write_manifest_list( - format_version=self._transaction.table_metadata.format_version, + format_version=table_metadata.format_version, output_file=self._io.new_output(manifest_list_file_path), snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, sequence_number=next_sequence_number, avro_compression=self._compression, + snapshot_first_row_id=snapshot_first_row_id, ) as writer: writer.add_manifests(new_manifests) - first_row_id: int | None = None - - if self._transaction.table_metadata.format_version >= 3: - first_row_id = self._transaction.table_metadata.next_row_id + if table_metadata.format_version >= 3: + if writer.next_row_id is None or snapshot_first_row_id is None: + raise ValueError("Cannot determine added rows for a v3 snapshot without row IDs") + added_rows = writer.next_row_id - snapshot_first_row_id + else: + added_rows = None + first_row_id = snapshot_first_row_id snapshot = Snapshot( snapshot_id=self._snapshot_id, @@ -297,8 +310,9 @@ def _commit(self) -> UpdatesAndRequirements: manifest_list=manifest_list_file_path, sequence_number=next_sequence_number, summary=summary, - schema_id=self._transaction.table_metadata.current_schema_id, + schema_id=table_metadata.current_schema_id, first_row_id=first_row_id, + added_rows=added_rows, ) add_snapshot_update = AddSnapshotUpdate(snapshot=snapshot) @@ -428,6 +442,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # avoid copying metadata for each evaluator table_metadata = self._transaction.table_metadata + is_v3 = table_metadata.format_version >= 3 schema = table_metadata.schema() manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) @@ -456,16 +471,53 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # It is relevant, let's check out the content deleted_entries = [] existing_entries = [] + # For v3 tables, surviving data files must KEEP the row ids they were + # already assigned. Rewriting the manifest would otherwise let the + # manifest-list writer re-number them (the source manifest's + # first_row_id is dropped). We materialize each surviving file's + # absolute _row_id into DataFile field 142 so it survives the rewrite. + # A file inherits row id = manifest.first_row_id + sum(record_count of + # all preceding data files in the manifest), unless it already carries + # an explicit field-142 value. + row_id_cursor: int | None = manifest_file.first_row_id if is_v3 else None for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): + materialized_data_file = entry.data_file + if is_v3: + explicit_first_row_id = materialized_data_file.first_row_id + if explicit_first_row_id is not None: + row_id_cursor = explicit_first_row_id + if row_id_cursor is None: + raise NotImplementedError( + "Cannot perform a v3 copy-on-write delete that preserves row " + "lineage: the source manifest is missing a first_row_id and " + f"the data file {materialized_data_file.file_path} has no " + "explicit field-142 first_row_id, so surviving rows cannot be " + "renumber-safely rewritten." + ) + if explicit_first_row_id is None: + # Persist the inherited absolute row id so it survives the rewrite. + materialized_data_file = copy(materialized_data_file) + materialized_data_file.first_row_id = row_id_cursor if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH: # Based on the metadata, it can be dropped right away deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED)) self._deleted_data_files.add(entry.data_file) else: # Based on the metadata, we cannot determine if it can be deleted - existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING)) + surviving_entry = _copy_with_new_status(entry, ManifestEntryStatus.EXISTING) + if is_v3: + surviving_entry = ManifestEntry.from_args( + status=surviving_entry.status, + snapshot_id=surviving_entry.snapshot_id, + sequence_number=surviving_entry.sequence_number, + file_sequence_number=surviving_entry.file_sequence_number, + data_file=materialized_data_file, + ) + existing_entries.append(surviving_entry) if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH: partial_rewrites_needed = True + if is_v3 and row_id_cursor is not None: + row_id_cursor += materialized_data_file.record_count if len(deleted_entries) > 0: total_deleted_entries += deleted_entries @@ -475,7 +527,15 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> with self.new_manifest_writer(spec=self.spec(manifest_file.partition_spec_id)) as writer: for existing_entry in existing_entries: writer.add_entry(existing_entry) - existing_manifests.append(writer.to_manifest_file()) + rewritten_manifest = writer.to_manifest_file() + if is_v3: + # Every surviving data file now carries its own field-142 + # row id, so the manifest-list writer must NOT re-assign a + # block. Inherit the source manifest's first_row_id (its + # range start) to keep the manifest itself row-id stable and + # prevent re-numbering of survivors. + rewritten_manifest.first_row_id = manifest_file.first_row_id + existing_manifests.append(rewritten_manifest) else: existing_manifests.append(manifest_file) else: @@ -751,8 +811,27 @@ def _group_by_spec(self, manifests: list[ManifestFile]) -> dict[int, list[Manife return groups def _create_manifest(self, spec_id: int, manifest_bin: list[ManifestFile]) -> ManifestFile: + format_version = self._snapshot_producer._transaction.table_metadata.format_version + # For v3 the merged manifest inherits min(first_row_id) and represents the contiguous + # range [min, min + total_rows). Reading it back assigns row ids to its DATA files in + # ENTRY order, so we must write entries in ascending source-manifest row-id order; + # otherwise inheritance would silently re-number rows. The writer emits manifests + # newest-first, so we sort here. DATA manifests carrying a first_row_id sort by it; + # any without (or delete manifests) keep a stable relative position at the front. + if format_version >= 3: + ordered_bin = sorted( + manifest_bin, + key=lambda manifest: ( + manifest.first_row_id + if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None + else -1 + ), + ) + else: + ordered_bin = manifest_bin + with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer: - for manifest in manifest_bin: + for manifest in ordered_bin: for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False): if entry.status == ManifestEntryStatus.DELETED and entry.snapshot_id == self._snapshot_producer.snapshot_id: # only files deleted by this snapshot should be added to the new manifest @@ -764,21 +843,85 @@ def _create_manifest(self, spec_id: int, manifest_bin: list[ManifestFile]) -> Ma # add all non-deleted files from the old manifest as existing files writer.existing(entry) - return writer.to_manifest_file() + merged_manifest = writer.to_manifest_file() + inherited_first_row_ids = [ + manifest.first_row_id + for manifest in manifest_bin + if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None + ] + if inherited_first_row_ids: + merged_manifest.first_row_id = min(inherited_first_row_ids) + + return merged_manifest + + def _v3_row_ids_are_contiguous(self, manifest_bin: list[ManifestFile]) -> bool: + """Whether the v3 DATA manifests cover a single gapless, non-overlapping row-id range. + + A merged manifest inherits ``min(first_row_id)`` and, because the entries are written + in ascending row-id order (see ``_create_manifest``), it represents the contiguous + range ``[min, min + total_rows)``. That is only correct when the source manifests' + ranges tile that interval exactly. We therefore SORT the ranges by ``first_row_id`` + and verify they are gapless/non-overlapping. The input order does NOT matter — the + writer emits manifests newest-first (descending), so requiring ascending input here + would (and previously did) disable v3 merging entirely. + """ + assigned = [ + manifest + for manifest in manifest_bin + if manifest.content == ManifestContent.DATA and manifest.first_row_id is not None + ] + if len(assigned) <= 1: + return True + + sorted_assigned = sorted(assigned, key=lambda manifest: manifest.first_row_id or 0) + + cursor = sorted_assigned[0].first_row_id + if cursor is None: + return False + + for manifest in sorted_assigned: + existing_rows_count = manifest.existing_rows_count + added_rows_count = manifest.added_rows_count + if existing_rows_count is None or added_rows_count is None: + return False + if manifest.first_row_id != cursor: + return False + cursor += existing_rows_count + added_rows_count + + return True def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: list[ManifestFile]) -> list[ManifestFile]: packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False) bins: list[list[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length) + format_version = self._snapshot_producer._transaction.table_metadata.format_version def merge_bin(manifest_bin: list[ManifestFile]) -> list[ManifestFile]: output_manifests = [] if len(manifest_bin) == 1: output_manifests.append(manifest_bin[0]) + elif ( + format_version >= 3 + and first_manifest in manifest_bin + and first_manifest.content == ManifestContent.DATA + and first_manifest.first_row_id is None + ): + remaining_manifests = [manifest for manifest in manifest_bin if manifest != first_manifest] + output_manifests.append(first_manifest) + if len(remaining_manifests) == 1: + output_manifests.append(remaining_manifests[0]) + elif len(remaining_manifests) < self._min_count_to_merge: + output_manifests.extend(remaining_manifests) + elif not self._v3_row_ids_are_contiguous(remaining_manifests): + output_manifests.extend(remaining_manifests) + else: + output_manifests.append(self._create_manifest(spec_id, remaining_manifests)) elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge: # if the bin has the first manifest (the new data files or an appended manifest file) then only # merge it if the number of manifests is above the minimum count. this is applied only to bins # with an in-memory manifest so that large manifests don't prevent merging older groups. output_manifests.extend(manifest_bin) + elif format_version >= 3 and not self._v3_row_ids_are_contiguous(manifest_bin): + output_manifests.extend(manifest_bin) else: output_manifests.append(self._create_manifest(spec_id, manifest_bin)) diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py index 137215ebc8..359ef339e1 100644 --- a/tests/avro/test_file.py +++ b/tests/avro/test_file.py @@ -164,6 +164,8 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: del v2_entry["file_sequence_number"] del v2_entry["data_file"]["content"] del v2_entry["data_file"]["equality_ids"] + # first_row_id (field 142) is a v3-only DataFile field, not present in the V1/V2 schema. + v2_entry["data_file"].pop("first_row_id", None) # Required in V1 v2_entry["data_file"]["block_size_in_bytes"] = DEFAULT_BLOCK_SIZE @@ -222,7 +224,10 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None: fa_entry = next(it) - assert todict(entry) == fa_entry + v2_entry = todict(entry) + # first_row_id (field 142) is a v3-only DataFile field, not present in the V2 schema. + v2_entry["data_file"].pop("first_row_id", None) + assert v2_entry == fa_entry @pytest.mark.parametrize("format_version", [1, 2]) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 7e64e6e7c0..e789ea98f0 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1718,6 +1718,7 @@ def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) -> summary=Summary(Operation.APPEND), schema_id=3, first_row_id=0, + added_rows=10, ) with pytest.raises( @@ -1727,6 +1728,47 @@ def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) -> update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) +def test_add_snapshot_update_fails_without_added_rows(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + first_row_id=1, + ) + + with pytest.raises( + ValueError, + match="Cannot add snapshot without added rows", + ): + update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + + +def test_add_snapshot_update_fails_with_null_table_next_row_id(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + first_row_id=1, + added_rows=10, + ) + + with pytest.raises( + ValueError, + match="Cannot add a snapshot when table next-row-id is null", + ): + update_table_metadata( + table_v3.metadata.model_copy(update={"next_row_id": None}), (AddSnapshotUpdate(snapshot=new_snapshot),) + ) + + def test_add_snapshot_update_updates_next_row_id(table_v3: Table) -> None: new_snapshot = Snapshot( snapshot_id=25, diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index c163c90626..6a99ef0c2b 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -184,12 +184,9 @@ def test_serialize_v2(example_table_metadata_v2: dict[str, Any]) -> None: def test_serialize_v3(example_table_metadata_v3: dict[str, Any]) -> None: - # Writing will be part of https://github.com/apache/iceberg-python/issues/1551 - - with pytest.raises(NotImplementedError) as exc_info: - _ = TableMetadataV3(**example_table_metadata_v3).model_dump_json() - - assert "Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551" in str(exc_info.value) + table_metadata = TableMetadataV3(**example_table_metadata_v3).model_dump_json() + expected = """{"location":"s3://bucket/test/location","table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true},{"id":4,"name":"u","type":"unknown","required":true},{"id":5,"name":"ns","type":"timestamp_ns","required":true},{"id":6,"name":"nstz","type":"timestamptz_ns","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"current-snapshot-id":3055729675574597004,"snapshots":[{"snapshot-id":3051729675574597004,"sequence-number":0,"timestamp-ms":1515100955770,"manifest-list":"s3://a/b/1.avro","summary":{"operation":"append"}},{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1}],"snapshot-log":[{"snapshot-id":3051729675574597004,"timestamp-ms":1515100955770},{"snapshot-id":3055729675574597004,"timestamp-ms":1555100955770}],"metadata-log":[{"metadata-file":"s3://bucket/.../v1.json","timestamp-ms":1515100}],"sort-orders":[{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]}],"default-sort-order-id":3,"refs":{"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000},"main":{"snapshot-id":3055729675574597004,"type":"branch"}},"statistics":[],"partition-statistics":[],"format-version":3,"last-sequence-number":34,"next-row-id":1}""" + assert table_metadata == expected def test_migrate_v1_schemas(example_table_metadata_v1: dict[str, Any]) -> None: @@ -837,6 +834,7 @@ def test_new_table_metadata_with_v3_schema() -> None: default_sort_order_id=1, refs={}, format_version=3, + next_row_id=0, ) assert actual.model_dump() == expected.model_dump() diff --git a/tests/table/test_v3_row_id_read.py b/tests/table/test_v3_row_id_read.py new file mode 100644 index 0000000000..7455fbfb2b --- /dev/null +++ b/tests/table/test_v3_row_id_read.py @@ -0,0 +1,277 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pathlib import Path +from typing import cast + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.expressions import AlwaysTrue, GreaterThan +from pyiceberg.expressions.visitors import bind +from pyiceberg.io import FileIO +from pyiceberg.io.pyarrow import PYARROW_PARQUET_FIELD_ID_KEY, PyArrowFileIO, _task_to_record_batches +from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ManifestContent, + ManifestEntry, + ManifestEntryStatus, + ManifestFile, +) +from pyiceberg.schema import ROW_ID_FIELD, ROW_ID_FIELD_ID, Schema +from pyiceberg.table import FileScanTask, Table, _open_manifest +from pyiceberg.types import IntegerType, NestedField, StringType + +SCHEMA = Schema( + NestedField(field_id=1, name="id", field_type=IntegerType(), required=False), + NestedField(field_id=2, name="name", field_type=StringType(), required=False), +) + +ARROW_SCHEMA = pa.schema( + [ + pa.field("id", pa.int32(), nullable=True), + pa.field("name", pa.string(), nullable=True), + ] +) + + +def _batch(ids: list[int]) -> pa.Table: + return pa.Table.from_pylist( + [{"id": row_id, "name": f"row-{row_id}"} for row_id in ids], + schema=ARROW_SCHEMA, + ) + + +def _create_table(tmp_path: Path, format_version: str = "3") -> tuple[InMemoryCatalog, Table]: + catalog = InMemoryCatalog(f"row-id-read-{format_version}", warehouse=f"file://{tmp_path}") + catalog.create_namespace("ns") + table = catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": format_version}) + return catalog, table + + +def _data_sequence_numbers(table: Table) -> set[int]: + snapshot = table.metadata.current_snapshot() + assert snapshot is not None + + sequence_numbers: set[int] = set() + for manifest in snapshot.manifests(table.io): + if manifest.content != ManifestContent.DATA: + continue + for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=True): + assert entry.sequence_number is not None + sequence_numbers.add(entry.sequence_number) + + return sequence_numbers + + +def _manifest_entry(status: ManifestEntryStatus, file_path: str, record_count: int) -> ManifestEntry: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=record_count, + file_size_in_bytes=1, + ) + return ManifestEntry.from_args(status=status, data_file=data_file) + + +def test_v3_scan_select_row_id(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path) + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + + result = table.scan().select("_row_id").to_arrow() + + assert result.column_names == ["_row_id"] + assert result.column("_row_id").to_pylist() == [0, 1, 2] + + +def test_open_manifest_row_id_inheritance_counts_deleted_null_first_row_id_entries() -> None: + deleted_entry = _manifest_entry(ManifestEntryStatus.DELETED, "deleted.parquet", 5) + live_entry = _manifest_entry(ManifestEntryStatus.ADDED, "live.parquet", 2) + + class ManifestWithDeletedEntry: + content = ManifestContent.DATA + first_row_id = 100 + + def __init__(self) -> None: + self.discard_deleted: bool | None = None + + def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: + self.discard_deleted = discard_deleted + entries = [deleted_entry, live_entry] + if discard_deleted: + return [entry for entry in entries if entry.status != ManifestEntryStatus.DELETED] + return entries + + manifest = ManifestWithDeletedEntry() + + entries = _open_manifest(cast(FileIO, object()), cast(ManifestFile, manifest), lambda _: True, lambda _: True) + + assert manifest.discard_deleted is False + assert [entry.data_file.file_path for entry in entries] == ["live.parquet"] + assert deleted_entry.data_file.first_row_id == 100 + assert live_entry.data_file.first_row_id == 105 + + +def test_v3_scan_row_ids_continue_across_appends(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path) + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + table.append(_batch([4, 5])) + table = catalog.load_table("ns.t") + + rows = table.scan().select("id", "_row_id").to_arrow().to_pylist() + + assert {row["id"]: row["_row_id"] for row in rows} == {1: 0, 2: 1, 3: 2, 4: 3, 5: 4} + + +def test_v3_scan_projects_row_id_alongside_real_columns(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path) + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + + plain = table.scan().select("id", "name").to_arrow() + with_row_id = table.scan().select("id", "name", "_row_id").to_arrow() + + assert with_row_id.column_names == ["id", "name", "_row_id"] + assert with_row_id.select(["id", "name"]).to_pylist() == plain.to_pylist() + assert with_row_id.column("_row_id").to_pylist() == [0, 1, 2] + + +def test_v3_scan_select_last_updated_sequence_number(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path) + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + + sequence_numbers = _data_sequence_numbers(table) + assert len(sequence_numbers) == 1 + data_sequence_number = next(iter(sequence_numbers)) + + result = table.scan().select("_last_updated_sequence_number").to_arrow() + + assert result.column_names == ["_last_updated_sequence_number"] + assert result.column("_last_updated_sequence_number").to_pylist() == [data_sequence_number] * 3 + + +def test_v2_scan_select_row_id_raises(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path, format_version="2") + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + + with pytest.raises(ValueError, match="only available for v3"): + table.scan().select("_row_id").to_arrow() + + +def test_v2_scan_select_last_updated_sequence_number_raises(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path, format_version="2") + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + + with pytest.raises(ValueError, match="only available for v3"): + table.scan().select("_last_updated_sequence_number").to_arrow() + + +def test_v3_select_star_does_not_include_row_id(tmp_path: Path) -> None: + catalog, table = _create_table(tmp_path) + table.append(_batch([1, 2, 3])) + table = catalog.load_table("ns.t") + + result = table.scan().select("*").to_arrow() + + assert result.column_names == ["id", "name"] + + +def test_row_id_positions_survive_positional_deletes_and_filter(tmp_path: Path) -> None: + arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) + arrow_table = pa.table([pa.array([1, 2, 3, 4, 5], type=pa.int32())], schema=arrow_schema) + file_path = str(tmp_path / "row-id-positional-filter.parquet") + with pq.ParquetWriter(file_path, arrow_schema) as writer: + writer.write_table(arrow_table) + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=len(arrow_table), + file_size_in_bytes=22, + ) + data_file.first_row_id = 10 + + table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) + projected_schema = Schema(NestedField(1, "id", IntegerType(), required=False), ROW_ID_FIELD) + positional_deletes = [pa.chunked_array([pa.array([1, 3], type=pa.int64())])] + + batches = list( + _task_to_record_batches( + PyArrowFileIO(), + FileScanTask(data_file, data_sequence_number=7), + bound_row_filter=bind(table_schema, GreaterThan("id", 2), case_sensitive=True), + projected_schema=projected_schema, + table_schema=table_schema, + projected_field_ids={1, ROW_ID_FIELD_ID}, + positional_deletes=positional_deletes, + case_sensitive=True, + ) + ) + + assert len(batches) == 1 + assert batches[0].to_pydict() == {"id": [3, 5], "_row_id": [12, 14]} + + +def test_row_id_null_when_first_row_id_missing(tmp_path: Path) -> None: + arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) + arrow_table = pa.table([pa.array([1, 2, 3, 4, 5], type=pa.int32())], schema=arrow_schema) + file_path = str(tmp_path / "row-id-null-first-row-id.parquet") + with pq.ParquetWriter(file_path, arrow_schema) as writer: + writer.write_table(arrow_table) + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=len(arrow_table), + file_size_in_bytes=22, + ) + # Pre-upgrade snapshots have a null first_row_id; _row_id must read as null per the v3 spec. + data_file.first_row_id = None + + table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) + projected_schema = Schema(NestedField(1, "id", IntegerType(), required=False), ROW_ID_FIELD) + + batches = list( + _task_to_record_batches( + PyArrowFileIO(), + FileScanTask(data_file, data_sequence_number=7), + bound_row_filter=AlwaysTrue(), + projected_schema=projected_schema, + table_schema=table_schema, + projected_field_ids={1, ROW_ID_FIELD_ID}, + positional_deletes=None, + case_sensitive=True, + ) + ) + + assert len(batches) == 1 + assert batches[0].to_pydict() == {"id": [1, 2, 3, 4, 5], "_row_id": [None, None, None, None, None]} diff --git a/tests/table/test_v3_row_lineage.py b/tests/table/test_v3_row_lineage.py new file mode 100644 index 0000000000..f6abba42b1 --- /dev/null +++ b/tests/table/test_v3_row_lineage.py @@ -0,0 +1,410 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Acceptance tests for v3 write gate + row-lineage assignment. + +T1-foundation: exercises the real local write path end to end, asserting that +v3 row lineage (next-row-id / first-row-id / added-rows) is correct and +monotonic across multiple commits, that next-row-id never silently falls back +to None, and that the full v3 metadata round-trips through JSON. +""" + +import json +from pathlib import Path +from typing import cast + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.manifest import ManifestContent +from pyiceberg.schema import Schema +from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV3 +from pyiceberg.types import IntegerType, NestedField, StringType + + +@pytest.fixture +def v3_catalog(tmp_path: Path) -> Catalog: + return InMemoryCatalog("t1", warehouse=f"file://{tmp_path}") + + +SCHEMA = Schema( + NestedField(field_id=1, name="id", field_type=IntegerType(), required=False), + NestedField(field_id=2, name="name", field_type=StringType(), required=False), +) + +ARROW_SCHEMA = pa.schema( + [ + pa.field("id", pa.int32(), nullable=True), + pa.field("name", pa.string(), nullable=True), + ] +) + + +def _batch(ids: list[int]) -> pa.Table: + return pa.Table.from_pylist( + [{"id": i, "name": f"row-{i}"} for i in ids], + schema=ARROW_SCHEMA, + ) + + +def test_v3_table_creation_starts_next_row_id_at_zero(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + assert tbl.metadata.format_version == 3 + assert tbl.metadata.next_row_id == 0 + + +def test_v3_append_twice_row_lineage_is_monotonic(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + + assert tbl.metadata.next_row_id == 0 + assert tbl.metadata.next_row_id is not None + + # First append: 3 rows. + tbl.append(_batch([1, 2, 3])) + tbl = v3_catalog.load_table("ns.t") + + snap1 = tbl.metadata.current_snapshot() + assert snap1 is not None + assert snap1.first_row_id == 0, "first snapshot must start assigning at row id 0" + assert snap1.added_rows == 3, "added_rows must reflect the 3 rows written, not None" + assert tbl.metadata.next_row_id == 3, "next_row_id must advance by added rows" + assert tbl.metadata.next_row_id is not None, "next_row_id must NEVER fall back to None on v3" + + # Second append: 2 rows. + tbl.append(_batch([4, 5])) + tbl = v3_catalog.load_table("ns.t") + + snaps = sorted(tbl.metadata.snapshots, key=lambda s: s.sequence_number or 0) + snap2 = snaps[-1] + assert snap2.first_row_id == 3, "second snapshot must start where the first left off" + assert snap2.added_rows == 2 + assert tbl.metadata.next_row_id == 5, "next_row_id must be strictly monotonically increasing" + + # Monotonicity across all snapshots: each first_row_id + added_rows chains. + running = 0 + for s in snaps: + assert s.first_row_id is not None + assert s.added_rows is not None + assert s.first_row_id == running, f"gap/overlap in row-id assignment at snapshot {s.snapshot_id}" + running += s.added_rows + assert running == tbl.metadata.next_row_id + + +def test_v3_merge_append_does_not_double_count_existing_rows(v3_catalog: Catalog, monkeypatch: pytest.MonkeyPatch) -> None: + """Merge-append must actually MERGE manifests and must NOT double-count existing rows. + + This is the #3070 double-count regression guard. We instrument the merge manager so the + test fails if no manifest merge ever happens (the historical bug: v3 merging was silently + disabled by a descending-vs-ascending ordering check, so this fix was dead code). + """ + from pyiceberg.table.update import snapshot as snapshot_module + + merge_calls = {"count": 0} + original_create = snapshot_module._ManifestMergeManager._create_manifest + + def _counting_create(self, spec_id, manifest_bin): # type: ignore[no-untyped-def] + merge_calls["count"] += 1 + return original_create(self, spec_id, manifest_bin) + + monkeypatch.setattr(snapshot_module._ManifestMergeManager, "_create_manifest", _counting_create) + + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table( + "ns.t", + schema=SCHEMA, + properties={ + "format-version": "3", + "commit.manifest-merge.enabled": "true", + "commit.manifest.min-count-to-merge": "1", + }, + ) + + for ids in ([1, 2, 3], [4, 5], [6, 7, 8, 9]): + tbl.append(_batch(ids)) + tbl = v3_catalog.load_table("ns.t") + + # The merge path MUST have run at least once; otherwise the double-count fix is dead code. + assert merge_calls["count"] > 0, "v3 manifest merge never ran — merging is silently disabled" + + # The data manifests must have been compacted (3 appends -> fewer than 3 DATA manifests). + snap = tbl.metadata.current_snapshot() + assert snap is not None + data_manifests = [m for m in snap.manifests(tbl.io) if m.content == ManifestContent.DATA] + assert len(data_manifests) < 3, "manifests were not actually merged" + + snaps = sorted(tbl.metadata.snapshots, key=lambda s: s.sequence_number or 0) + added_rows = [s.added_rows for s in snaps] + + # next_row_id == 9 proves no double-counting (9 real rows, not 17). + assert tbl.metadata.next_row_id == 9 + assert added_rows == [3, 2, 4] + assert all(rows is not None for rows in added_rows) + assert sum(rows for rows in added_rows if rows is not None) == tbl.metadata.next_row_id + + # The merged manifests must tile [0, 9) exactly, with each data file's row range coherent. + assigned = sorted( + ((cast(int, m.first_row_id), cast(int, m.existing_rows_count) + cast(int, m.added_rows_count)) for m in data_manifests), + key=lambda pair: pair[0], + ) + cursor = 0 + for first_row_id, rows in assigned: + assert first_row_id == cursor, "merged manifest row-id ranges have a gap/overlap" + cursor += rows + assert cursor == 9 + + # The data is still fully readable and correct after merging. + actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) + assert actual_ids == [1, 2, 3, 4, 5, 6, 7, 8, 9] + + +def test_v3_manifest_carries_first_row_id(v3_catalog: Catalog) -> None: + """The data manifest in the manifest list must be assigned a first_row_id per spec.""" + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + tbl.append(_batch([1, 2, 3])) + tbl = v3_catalog.load_table("ns.t") + + snap = tbl.metadata.current_snapshot() + assert snap is not None + manifests = snap.manifests(tbl.io) + data_manifests = [m for m in manifests if m.content == ManifestContent.DATA] + assert len(data_manifests) >= 1 + # The first (only) data manifest must carry the snapshot's first_row_id (0). + assert data_manifests[0].first_row_id == 0 + + +def test_v3_metadata_round_trips_through_json(v3_catalog: Catalog) -> None: + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + tbl.append(_batch([1, 2, 3])) + tbl.append(_batch([4, 5])) + tbl = v3_catalog.load_table("ns.t") + + meta = tbl.metadata + assert isinstance(meta, TableMetadataV3) + + dumped = meta.model_dump_json() + reparsed = TableMetadataUtil.parse_raw(dumped) + assert isinstance(reparsed, TableMetadataV3) + assert reparsed.next_row_id == meta.next_row_id == 5 + # Round-trip equality of the model. + assert reparsed == meta + # The serialized JSON must include next-row-id. + assert json.loads(dumped)["next-row-id"] == 5 + + +def _data_file_row_ids(tbl: object) -> list[tuple[int | None, int | None, int]]: + """Return (manifest.first_row_id, data_file.first_row_id (field 142), record_count) for DATA entries.""" + snap = tbl.metadata.current_snapshot() # type: ignore[attr-defined] + out: list[tuple[int | None, int | None, int]] = [] + for m in snap.manifests(tbl.io): # type: ignore[attr-defined] + if m.content != ManifestContent.DATA: + continue + for entry in m.fetch_manifest_entry(tbl.io, discard_deleted=True): # type: ignore[attr-defined] + out.append((m.first_row_id, entry.data_file.first_row_id, entry.data_file.record_count)) + return out + + +def test_v3_whole_file_delete_does_not_renumber_surviving_rows(v3_catalog: Catalog) -> None: + """Copy-on-write whole-file delete must NEVER re-number the surviving rows. + + Two separate data files are written (row ids [0,1,2] and [3,4,5]). Deleting the whole + first file (predicate aligned to a full file) must: + - leave next_row_id unchanged (0 new rows assigned), and + - keep the surviving file's row-id lineage intact (manifest first_row_id == 3, and the + materialized data-file field-142 first_row_id == 3) — NOT renumbered to 0. + This asserts on the _row_id (field 142 / manifest first_row_id), not the user `id` column. + """ + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + + # Two separate files: ids 0,1,2 (row ids 0-2) then ids 10,11,12 (row ids 3-5). + tbl.append(_batch([0, 1, 2])) + tbl = v3_catalog.load_table("ns.t") + tbl.append(_batch([10, 11, 12])) + tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 + + before = sorted(_data_file_row_ids(tbl), key=lambda triple: triple[0] or 0) + # manifest first_row_ids should be 0 and 3. + assert [triple[0] for triple in before] == [0, 3] + + # Delete the entire first file (all ids < 5). This is a metadata-only (whole-file) delete. + tbl.delete(delete_filter="id < 5") + tbl = v3_catalog.load_table("ns.t") + + # next_row_id must NOT advance — zero rows were newly assigned. + assert tbl.metadata.next_row_id == 6, "whole-file delete must not assign new row ids" + delete_snap = tbl.metadata.current_snapshot() + assert delete_snap is not None + assert delete_snap.added_rows == 0, "a delete must report 0 added rows, not re-numbered survivors" + + # The surviving file must KEEP its original row-id lineage (manifest first_row_id == 3). + surviving = _data_file_row_ids(tbl) + assert len(surviving) == 1 + surviving_manifest_frid, surviving_datafile_frid, surviving_rows = surviving[0] + assert surviving_rows == 3 + assert surviving_manifest_frid == 3, "surviving rows must keep their original first_row_id (3), not be renumbered" + + # Data correctness. + actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) + assert actual_ids == [10, 11, 12] + + +def test_v3_whole_file_delete_with_two_survivors_renumbers_none(v3_catalog: Catalog) -> None: + """Three files; delete the middle file wholesale; the two survivors keep their row ids.""" + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + tbl.append(_batch([0, 1])) # row ids 0-1 + tbl = v3_catalog.load_table("ns.t") + tbl.append(_batch([10, 11])) # row ids 2-3 (deleted) + tbl = v3_catalog.load_table("ns.t") + tbl.append(_batch([20, 21])) # row ids 4-5 + tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 + + # delete the middle file (ids 10,11) wholesale using a bound-provable range predicate + # (min=10,max=11 fully inside [10,20)), so this is a metadata-only whole-file delete. + tbl.delete(delete_filter="id >= 10 and id < 20") + tbl = v3_catalog.load_table("ns.t") + + assert tbl.metadata.next_row_id == 6, "no new row ids on a whole-file delete" + frids = sorted(cast(int, triple[0]) for triple in _data_file_row_ids(tbl)) + # survivors must keep first_row_ids 0 and 4 (NOT renumbered to 0 and 2). + assert frids == [0, 4], "survivors were re-numbered after deleting the middle file" + actual_ids = sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) + assert actual_ids == [0, 1, 20, 21] + + +def test_v3_whole_file_delete_in_shared_manifest_preserves_survivor_row_ids(v3_catalog: Catalog) -> None: + """When two data files share ONE manifest and one is deleted wholesale, the survivor must + keep its row-id lineage. + + This is the strongest delete-lineage guard: the surviving file is REWRITTEN into a new + manifest (the shared source manifest is dropped), so without preserving lineage the + manifest-list writer renumbers the survivor (the historical bug: next_row_id jumped 6->9, + survivor block frid 0->6). The fix materializes the survivor's absolute _row_id into + DataFile field 142 and inherits the source manifest's first_row_id. + """ + import itertools + import uuid + + from pyiceberg.io.pyarrow import _dataframe_to_data_files + + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + + # Two data files in a SINGLE manifest (one fast-append commit): row ids 0-2 and 3-5. + with tbl.transaction() as txn: + with txn.update_snapshot().fast_append() as fast_append: + for ids in ([0, 1, 2], [100, 101, 102]): + for data_file in _dataframe_to_data_files( + io=tbl.io, + df=_batch(ids), + table_metadata=txn.table_metadata, + write_uuid=uuid.uuid4(), + counter=itertools.count(), + ): + fast_append.append_data_file(data_file) + tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 + # one shared manifest with first_row_id == 0 + assert [m[0] for m in _data_file_row_ids(tbl)] == [0, 0] + + # Whole-file delete the first file (ids 0,1,2): bound-provable (max=2 < 5). + tbl.delete(delete_filter="id < 5") + tbl = v3_catalog.load_table("ns.t") + + # next_row_id must NOT advance and no new rows are added. + assert tbl.metadata.next_row_id == 6, "survivor was re-numbered (next_row_id advanced)" + delete_snap = tbl.metadata.current_snapshot() + assert delete_snap is not None + assert delete_snap.added_rows == 0 + + surviving = _data_file_row_ids(tbl) + assert len(surviving) == 1 + manifest_frid, datafile_frid, rows = surviving[0] + assert rows == 3 + # The rewritten manifest inherits the source manifest's first_row_id (0)... + assert manifest_frid == 0, "rewritten manifest must inherit the source manifest first_row_id" + # ...and the survivor's absolute _row_id (3) is materialized into field 142. + assert datafile_frid == 3, "survivor's _row_id (field 142) must be materialized to its original value (3)" + + assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [100, 101, 102] + + +def test_v3_partial_rewrite_delete_fails_loudly(v3_catalog: Catalog) -> None: + """A copy-on-write delete that needs to REWRITE a data file must fail loudly on v3. + + Preserving _row_id lineage across a physical rewrite needs materialized per-row _row_id + columns, which PyIceberg does not have. Rather than silently re-numbering survivors, the + v3 path raises NotImplementedError. + """ + v3_catalog.create_namespace("ns") + tbl = v3_catalog.create_table("ns.t", schema=SCHEMA, properties={"format-version": "3"}) + # Single file containing 0..5; deleting a subset forces a partial rewrite. + tbl.append(_batch([0, 1, 2, 3, 4, 5])) + tbl = v3_catalog.load_table("ns.t") + + with pytest.raises(NotImplementedError, match="copy-on-write delete"): + tbl.delete(delete_filter="id in (2, 3)") + + # The table state is unchanged (no corruption, no renumbering). + tbl = v3_catalog.load_table("ns.t") + assert tbl.metadata.next_row_id == 6 + assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [0, 1, 2, 3, 4, 5] + + +def test_v2_partial_rewrite_delete_still_works(tmp_path: Path) -> None: + """The loud v3 failure must NOT regress v2 copy-on-write deletes.""" + cat = InMemoryCatalog("t1v2", warehouse=f"file://{tmp_path}") + cat.create_namespace("ns") + tbl = cat.create_table("ns.t", schema=SCHEMA, properties={"format-version": "2"}) + tbl.append(_batch([0, 1, 2, 3, 4, 5])) + tbl = cat.load_table("ns.t") + tbl.delete(delete_filter="id in (2, 3)") + tbl = cat.load_table("ns.t") + assert sorted(row["id"] for row in tbl.scan().to_arrow().to_pylist()) == [0, 1, 4, 5] + + +def test_v3_upgrade_from_v2_seeds_next_row_id(tmp_path: Path) -> None: + """v2 -> v3 upgrade must be reachable via the public API and seed next_row_id = 0.""" + cat = InMemoryCatalog("t1up", warehouse=f"file://{tmp_path}") + cat.create_namespace("ns") + tbl = cat.create_table("ns.t", schema=SCHEMA, properties={"format-version": "2"}) + assert tbl.metadata.format_version == 2 + # v2 metadata does not carry next-row-id at all. + assert getattr(tbl.metadata, "next_row_id", None) is None + + with tbl.transaction() as txn: + txn.upgrade_table_version(3) + tbl = cat.load_table("ns.t") + + assert tbl.metadata.format_version == 3 + assert tbl.metadata.next_row_id == 0, "upgrade to v3 must seed next_row_id at 0" + + # And the upgraded table must support a v3 append with correct row lineage. + tbl.append(_batch([1, 2, 3])) + tbl = cat.load_table("ns.t") + snap = tbl.metadata.current_snapshot() + assert snap is not None + assert snap.first_row_id == 0 + assert snap.added_rows == 3 + assert tbl.metadata.next_row_id == 3