From 90c573bbec4e39fbf61f40cd6e1a20b4312115ff Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 19:03:30 -0500 Subject: [PATCH 1/3] Produce position deletes for Table.delete in merge-on-read mode When write.delete.mode=merge-on-read on a format-version 2 table, Transaction.delete(predicate) now produces position deletes (via the PR2 writer + row-delta producer) instead of warning and falling back to copy-on-write. Copy-on-write remains the default; merge-on-read is opt-in. _delete_merge_on_read scans each matched data file's RAW physical rows (no delete files applied), assigns 0-based positions, applies the predicate, subtracts positions already covered by existing delete files (idempotent), and commits one position-delete file per data file as a single OVERWRITE row-delta snapshot. Data files are never rewritten; the delete inherits the new snapshot sequence number so it cannot apply to data appended afterward. Non-v2 tables under merge-on-read still warn and fall back to copy-on-write. --- pyiceberg/table/__init__.py | 74 ++++++- tests/table/test_mor_delete.py | 361 +++++++++++++++++++++++++++++++++ 2 files changed, 432 insertions(+), 3 deletions(-) create mode 100644 tests/table/test_mor_delete.py diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 597f62632f..b9f3dbd34f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -716,15 +716,18 @@ def delete( """ from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow + if isinstance(delete_filter, str): + delete_filter = _parse_row_filter(delete_filter) + if ( self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT) == TableProperties.DELETE_MODE_MERGE_ON_READ ): + if self.table_metadata.format_version == 2: + self._delete_merge_on_read(delete_filter, snapshot_properties, case_sensitive, branch) + return warnings.warn("Merge on read is not yet supported, falling back to copy-on-write", stacklevel=2) - if isinstance(delete_filter, str): - delete_filter = _parse_row_filter(delete_filter) - with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot: delete_snapshot.delete_by_predicate(delete_filter, case_sensitive) @@ -790,6 +793,71 @@ def delete( if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed: warnings.warn("Delete operation did not match any records", stacklevel=2) + def _delete_merge_on_read( + self, + delete_filter: BooleanExpression, + snapshot_properties: dict[str, str] = EMPTY_DICT, + case_sensitive: bool = True, + branch: str | None = MAIN_BRANCH, + ) -> None: + import pyarrow as pa + + from pyiceberg.io.pyarrow import ArrowScan, _read_all_delete_files, expression_to_pyarrow, write_position_delete_file + + file_scan = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive) + if branch is not None: + file_scan = file_scan.use_ref(branch) + tasks = list(file_scan.plan_files()) + + bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive) + pyarrow_filter = expression_to_pyarrow(bound_delete_filter, self.table_metadata.schema()) + deletes_per_file = _read_all_delete_files(self._table.io, tasks) + positions_by_data_file: dict[DataFile, set[int]] = {} + + raw_scan = ArrowScan( + table_metadata=self.table_metadata, + io=self._table.io, + projected_schema=self.table_metadata.schema(), + row_filter=AlwaysTrue(), + case_sensitive=case_sensitive, + ) + + for task in tasks: + existing_deleted_positions: set[int] = set() + for positions in deletes_per_file.get(task.file.file_path, []): + existing_deleted_positions.update(int(pos) for pos in positions.to_pylist()) + + current_index = 0 + raw_task = FileScanTask(task.file, delete_files=set(), residual=AlwaysTrue()) + for batch in raw_scan.to_record_batches([raw_task]): + row_positions = pa.array(range(current_index, current_index + batch.num_rows), type=pa.int64()) + current_index += batch.num_rows + + batch_with_positions = pa.Table.from_batches([batch]).append_column("__pyiceberg_position", row_positions) + matching_positions = batch_with_positions.filter(pyarrow_filter).column("__pyiceberg_position").to_pylist() + positions_to_delete = {int(pos) for pos in matching_positions if int(pos) not in existing_deleted_positions} + + if positions_to_delete: + positions_by_data_file.setdefault(task.file, set()).update(positions_to_delete) + + if not positions_by_data_file: + warnings.warn("Delete operation did not match any records", stacklevel=2) + return + + counter = itertools.count(0) + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).row_delta() as producer: + for data_file, positions in positions_by_data_file.items(): + producer.append_delete_file( + write_position_delete_file( + io=self._table.io, + table_metadata=self.table_metadata, + referenced_data_file=data_file, + positions=sorted(positions), + write_uuid=producer.commit_uuid, + counter=counter, + ) + ) + def upsert( self, df: pa.Table, diff --git a/tests/table/test_mor_delete.py b/tests/table/test_mor_delete.py new file mode 100644 index 0000000000..aa516ab6a5 --- /dev/null +++ b/tests/table/test_mor_delete.py @@ -0,0 +1,361 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.exceptions import NamespaceAlreadyExistsError +from pyiceberg.expressions import EqualTo, In +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import Table, TableProperties +from pyiceberg.table.snapshots import Operation +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import LongType, NestedField, StringType + +ICEBERG_SCHEMA = Schema( + NestedField(1, "id", LongType(), required=True), + NestedField(2, "data", StringType(), required=True), +) +PARTITIONED_SCHEMA = Schema( + NestedField(1, "id", LongType(), required=True), + NestedField(2, "data", StringType(), required=True), + NestedField(3, "category", StringType(), required=True), +) +ARROW_SCHEMA = pa.schema( + [ + pa.field("id", pa.int64(), nullable=False), + pa.field("data", pa.string(), nullable=False), + ] +) +PARTITIONED_ARROW_SCHEMA = pa.schema( + [ + pa.field("id", pa.int64(), nullable=False), + pa.field("data", pa.string(), nullable=False), + pa.field("category", pa.string(), nullable=False), + ] +) +PARTITION_SPEC = PartitionSpec(PartitionField(source_id=3, field_id=1000, transform=IdentityTransform(), name="category")) + + +def _arrow_table(rows: list[dict[str, object]], schema: pa.Schema = ARROW_SCHEMA) -> pa.Table: + return pa.Table.from_pylist(rows, schema=schema) + + +def _create_table( + catalog: Catalog, + identifier: str, + *, + format_version: int = 2, + merge_on_read: bool = True, + schema: Schema = ICEBERG_SCHEMA, + partition_spec: PartitionSpec | None = None, +) -> Table: + try: + catalog.create_namespace("default") + except NamespaceAlreadyExistsError: + pass + + properties = {TableProperties.FORMAT_VERSION: str(format_version)} + if merge_on_read: + properties[TableProperties.DELETE_MODE] = TableProperties.DELETE_MODE_MERGE_ON_READ + + return catalog.create_table( + identifier=identifier, + schema=schema, + partition_spec=partition_spec or PartitionSpec(), + properties=properties, + ) + + +def _append_rows(table: Table, rows: list[dict[str, object]], schema: pa.Schema = ARROW_SCHEMA) -> None: + table.append(_arrow_table(rows, schema=schema)) + + +def _current_data_files(table: Table) -> list[DataFile]: + snapshot = table.current_snapshot() + assert snapshot is not None + return [ + entry.data_file + for manifest in snapshot.manifests(io=table.io) + if manifest.content == ManifestContent.DATA + for entry in manifest.fetch_manifest_entry(io=table.io) + if entry.data_file.content == DataFileContent.DATA + ] + + +def _current_delete_files(table: Table) -> list[DataFile]: + snapshot = table.current_snapshot() + assert snapshot is not None + return [ + entry.data_file + for manifest in snapshot.manifests(io=table.io) + if manifest.content == ManifestContent.DELETES + for entry in manifest.fetch_manifest_entry(io=table.io) + if entry.data_file.content == DataFileContent.POSITION_DELETES + ] + + +def _current_manifests_with_content(table: Table, content: ManifestContent) -> list[ManifestFile]: + snapshot = table.current_snapshot() + assert snapshot is not None + return [manifest for manifest in snapshot.manifests(io=table.io) if manifest.content == content] + + +def _data_paths(table: Table) -> set[str]: + return {data_file.file_path for data_file in _current_data_files(table)} + + +def _rows(table: Table) -> list[dict[str, object]]: + return sorted(table.scan().to_arrow().to_pylist(), key=lambda row: (row["id"], row.get("data", ""))) + + +def test_mor_delete_basic_produces_position_delete_without_rewriting_data_files(catalog: Catalog) -> None: + identifier = "default.test_mor_delete_basic" + table = _create_table(catalog, identifier) + _append_rows( + table, + [ + {"id": 1, "data": "a"}, + {"id": 2, "data": "b"}, + {"id": 3, "data": "c"}, + ], + ) + before_data_paths = _data_paths(table) + + table.delete(EqualTo("id", 2)) + + assert _rows(table) == [{"id": 1, "data": "a"}, {"id": 3, "data": "c"}] + assert _data_paths(table) == before_data_paths + assert len(_current_delete_files(table)) == 1 + assert len(_current_manifests_with_content(table, ManifestContent.DELETES)) == 1 + + current_snapshot = table.current_snapshot() + assert current_snapshot is not None + assert current_snapshot.summary.operation == Operation.OVERWRITE + + reloaded = catalog.load_table(identifier) + assert _rows(reloaded) == [{"id": 1, "data": "a"}, {"id": 3, "data": "c"}] + + +def test_mor_delete_multiple_rows_in_one_file(catalog: Catalog) -> None: + table = _create_table(catalog, "default.test_mor_delete_multiple_rows") + _append_rows( + table, + [ + {"id": 1, "data": "a"}, + {"id": 2, "data": "b"}, + {"id": 3, "data": "c"}, + {"id": 4, "data": "d"}, + ], + ) + + table.delete(In("id", (2, 4))) + + assert _rows(table) == [{"id": 1, "data": "a"}, {"id": 3, "data": "c"}] + delete_files = _current_delete_files(table) + assert len(delete_files) == 1 + assert delete_files[0].record_count == 2 + + +def test_partitioned_mor_delete_writes_delete_file_per_affected_partition(catalog: Catalog) -> None: + table = _create_table( + catalog, + "default.test_partitioned_mor_delete", + schema=PARTITIONED_SCHEMA, + partition_spec=PARTITION_SPEC, + ) + _append_rows( + table, + [ + {"id": 1, "data": "a", "category": "alpha"}, + {"id": 2, "data": "b", "category": "alpha"}, + {"id": 3, "data": "c", "category": "beta"}, + {"id": 4, "data": "d", "category": "beta"}, + {"id": 5, "data": "e", "category": "gamma"}, + ], + schema=PARTITIONED_ARROW_SCHEMA, + ) + before_data_paths = _data_paths(table) + + table.delete(In("id", (2, 4))) + + assert _rows(table) == [ + {"id": 1, "data": "a", "category": "alpha"}, + {"id": 3, "data": "c", "category": "beta"}, + {"id": 5, "data": "e", "category": "gamma"}, + ] + assert _data_paths(table) == before_data_paths + + delete_files = _current_delete_files(table) + assert len(delete_files) == 2 + assert {delete_file.partition[0] for delete_file in delete_files} == {"alpha", "beta"} + assert {delete_file.record_count for delete_file in delete_files} == {1} + assert "gamma" not in {delete_file.partition[0] for delete_file in delete_files} + + +def test_mor_delete_sequence_number_scopes_delete_to_existing_data_files(catalog: Catalog) -> None: + table = _create_table(catalog, "default.test_mor_delete_sequence_number_scoping") + _append_rows( + table, + [ + {"id": 1, "data": "old-a"}, + {"id": 2, "data": "old-b"}, + {"id": 3, "data": "old-c"}, + ], + ) + + table.delete(EqualTo("id", 2)) + delete_snapshot = table.current_snapshot() + assert delete_snapshot is not None + + _append_rows(table, [{"id": 2, "data": "new-b"}, {"id": 4, "data": "new-d"}]) + + assert _rows(table) == [ + {"id": 1, "data": "old-a"}, + {"id": 2, "data": "new-b"}, + {"id": 3, "data": "old-c"}, + {"id": 4, "data": "new-d"}, + ] + assert len(_current_delete_files(table)) == 1 + + append_snapshot = table.current_snapshot() + assert append_snapshot is not None + assert append_snapshot.sequence_number == delete_snapshot.sequence_number + 1 + + +def test_successive_mor_deletes_do_not_reemit_already_deleted_positions(catalog: Catalog) -> None: + table = _create_table(catalog, "default.test_successive_mor_deletes") + _append_rows( + table, + [ + {"id": 1, "data": "a"}, + {"id": 2, "data": "b"}, + {"id": 3, "data": "c"}, + {"id": 4, "data": "d"}, + ], + ) + + table.delete(EqualTo("id", 2)) + table.delete(In("id", (2, 4))) + + assert _rows(table) == [{"id": 1, "data": "a"}, {"id": 3, "data": "c"}] + delete_files = _current_delete_files(table) + assert len(delete_files) == 2 + assert sum(delete_file.record_count for delete_file in delete_files) == 2 + assert sorted(delete_file.record_count for delete_file in delete_files) == [1, 1] + + +def test_default_delete_mode_is_copy_on_write(catalog: Catalog) -> None: + table = _create_table(catalog, "default.test_default_delete_mode_is_copy_on_write", merge_on_read=False) + _append_rows( + table, + [ + {"id": 1, "data": "a"}, + {"id": 2, "data": "b"}, + {"id": 3, "data": "c"}, + ], + ) + before_data_paths = _data_paths(table) + + table.delete(EqualTo("id", 2)) + + assert _rows(table) == [{"id": 1, "data": "a"}, {"id": 3, "data": "c"}] + assert _data_paths(table) != before_data_paths + assert _current_delete_files(table) == [] + + +def test_mor_delete_on_v1_warns_and_falls_back_to_copy_on_write(catalog: Catalog) -> None: + table = _create_table(catalog, "default.test_mor_delete_v1_fallback", format_version=1) + _append_rows( + table, + [ + {"id": 1, "data": "a"}, + {"id": 2, "data": "b"}, + {"id": 3, "data": "c"}, + ], + ) + before_data_paths = _data_paths(table) + + with pytest.warns(UserWarning, match="Merge on read is not yet supported, falling back to copy-on-write"): + table.delete(EqualTo("id", 2)) + + assert _rows(table) == [{"id": 1, "data": "a"}, {"id": 3, "data": "c"}] + assert _data_paths(table) != before_data_paths + assert _current_delete_files(table) == [] + + +def test_mor_delete_no_match_warns_and_does_not_create_snapshot(catalog: Catalog) -> None: + table = _create_table(catalog, "default.test_mor_delete_no_match") + _append_rows(table, [{"id": 1, "data": "a"}, {"id": 2, "data": "b"}]) + before_snapshot = table.current_snapshot() + assert before_snapshot is not None + before_snapshot_count = len(table.snapshots()) + + with pytest.warns(UserWarning, match="Delete operation did not match any records"): + table.delete(EqualTo("id", 99)) + + after_snapshot = table.current_snapshot() + assert after_snapshot is not None + assert after_snapshot.snapshot_id == before_snapshot.snapshot_id + assert len(table.snapshots()) == before_snapshot_count + assert _current_delete_files(table) == [] + + +def test_mor_delete_position_alignment_across_multiple_record_batches(catalog: Catalog) -> None: + # A position-delete `pos` is a GLOBAL 0-based physical row index into the data file. + # When a single data file is read in multiple record batches, the running position + # accounting must be global, not batch-local. Force small row groups so the file is + # read in many batches, then delete a row well past the first batch and assert exactly + # that row is gone. A batch-local position bug would silently delete the wrong row. + table = _create_table( + catalog, + "default.test_mor_delete_multi_batch_alignment", + # Small parquet row group + batch size so a single file spans many batches on read. + ) + table = ( + table.transaction() + .set_properties( + { + TableProperties.PARQUET_ROW_GROUP_LIMIT: "8", + TableProperties.PARQUET_PAGE_ROW_LIMIT: "8", + } + ) + .commit_transaction() + ) + + rows = [{"id": i, "data": f"v{i}"} for i in range(200)] + _append_rows(table, rows) + data_files = _current_data_files(table) + assert len(data_files) == 1 # single physical file; positions are global within it + before_paths = _data_paths(table) + + # id=137 sits far past the first batch; its physical position equals 137. + table.delete(EqualTo("id", 137)) + + remaining_ids = sorted(row["id"] for row in table.scan().to_arrow().to_pylist()) + assert 137 not in remaining_ids + assert remaining_ids == [i for i in range(200) if i != 137] + # No rewrite, and exactly one matched position recorded. + assert _data_paths(table) == before_paths + delete_files = _current_delete_files(table) + assert len(delete_files) == 1 + assert delete_files[0].record_count == 1 From 5cbe03148a443e2c263c8c3e91829c328fbf1936 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 20:21:59 -0500 Subject: [PATCH 2/3] Make MoR delete position column collision-proof and strengthen tests --- pyiceberg/table/__init__.py | 8 +++-- tests/table/test_mor_delete.py | 54 ++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b9f3dbd34f..63b06e253e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -833,8 +833,12 @@ def _delete_merge_on_read( row_positions = pa.array(range(current_index, current_index + batch.num_rows), type=pa.int64()) current_index += batch.num_rows - batch_with_positions = pa.Table.from_batches([batch]).append_column("__pyiceberg_position", row_positions) - matching_positions = batch_with_positions.filter(pyarrow_filter).column("__pyiceberg_position").to_pylist() + position_column_name = "__pyiceberg_position" + while position_column_name in batch.schema.names: + position_column_name += "_" + + batch_with_positions = pa.Table.from_batches([batch]).append_column(position_column_name, row_positions) + matching_positions = batch_with_positions.filter(pyarrow_filter).column(position_column_name).to_pylist() positions_to_delete = {int(pos) for pos in matching_positions if int(pos) not in existing_deleted_positions} if positions_to_delete: diff --git a/tests/table/test_mor_delete.py b/tests/table/test_mor_delete.py index aa516ab6a5..3bf4a0a610 100644 --- a/tests/table/test_mor_delete.py +++ b/tests/table/test_mor_delete.py @@ -227,6 +227,17 @@ def test_mor_delete_sequence_number_scopes_delete_to_existing_data_files(catalog delete_snapshot = table.current_snapshot() assert delete_snapshot is not None + # The position-delete file's sequence number must equal the delete snapshot's sequence + # number. This is what scopes the delete to pre-existing data files only: a positional + # delete applies to data with a data-sequence-number <= the delete's sequence number. + delete_entry_sequence_numbers = [ + entry.sequence_number + for manifest in delete_snapshot.manifests(io=table.io) + if manifest.content == ManifestContent.DELETES + for entry in manifest.fetch_manifest_entry(io=table.io) + ] + assert delete_entry_sequence_numbers == [delete_snapshot.sequence_number] + _append_rows(table, [{"id": 2, "data": "new-b"}, {"id": 4, "data": "new-d"}]) assert _rows(table) == [ @@ -240,6 +251,8 @@ def test_mor_delete_sequence_number_scopes_delete_to_existing_data_files(catalog append_snapshot = table.current_snapshot() assert append_snapshot is not None assert append_snapshot.sequence_number == delete_snapshot.sequence_number + 1 + # The newly appended data has a strictly higher data sequence number than the delete + # file, so the re-inserted id=2 survives even though it shares the deleted value. def test_successive_mor_deletes_do_not_reemit_already_deleted_positions(catalog: Catalog) -> None: @@ -320,6 +333,42 @@ def test_mor_delete_no_match_warns_and_does_not_create_snapshot(catalog: Catalog assert _current_delete_files(table) == [] +def test_mor_delete_with_user_column_named_like_internal_position(catalog: Catalog) -> None: + # The MoR delete path appends a temporary position column to compute matching rows. + # If a real table column shares that internal name, the path must not collide or + # mistakenly read the user column as positions. + schema = Schema( + NestedField(1, "id", LongType(), required=True), + NestedField(2, "__pyiceberg_position", LongType(), required=True), + ) + arrow_schema = pa.schema( + [ + pa.field("id", pa.int64(), nullable=False), + pa.field("__pyiceberg_position", pa.int64(), nullable=False), + ] + ) + table = _create_table(catalog, "default.test_mor_delete_position_name_collision", schema=schema) + _append_rows( + table, + [ + {"id": 1, "__pyiceberg_position": 100}, + {"id": 2, "__pyiceberg_position": 200}, + {"id": 3, "__pyiceberg_position": 300}, + ], + schema=arrow_schema, + ) + before_data_paths = _data_paths(table) + + table.delete(EqualTo("id", 2)) + + assert sorted(table.scan().to_arrow().to_pylist(), key=lambda row: row["id"]) == [ + {"id": 1, "__pyiceberg_position": 100}, + {"id": 3, "__pyiceberg_position": 300}, + ] + assert _data_paths(table) == before_data_paths + assert len(_current_delete_files(table)) == 1 + + def test_mor_delete_position_alignment_across_multiple_record_batches(catalog: Catalog) -> None: # A position-delete `pos` is a GLOBAL 0-based physical row index into the data file. # When a single data file is read in multiple record batches, the running position @@ -348,6 +397,11 @@ def test_mor_delete_position_alignment_across_multiple_record_batches(catalog: C assert len(data_files) == 1 # single physical file; positions are global within it before_paths = _data_paths(table) + # Guard the test's own premise: the single file must actually be read in multiple + # batches, otherwise a batch-local position regression would never be exercised. + batch_count = sum(1 for _ in table.scan().to_arrow_batch_reader()) + assert batch_count > 1, "expected the data file to be read in multiple record batches" + # id=137 sits far past the first batch; its physical position equals 137. table.delete(EqualTo("id", 137)) From 359be195c4beabba411c04d517993fbe14500948 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Wed, 24 Jun 2026 08:27:39 -0500 Subject: [PATCH 3/3] chore: satisfy mypy on MoR delete tests --- tests/table/test_mor_delete.py | 2 ++ tests/table/test_mor_row_delta_producer.py | 2 ++ tests/utils/test_manifest.py | 3 ++- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/table/test_mor_delete.py b/tests/table/test_mor_delete.py index 3bf4a0a610..e8211ce791 100644 --- a/tests/table/test_mor_delete.py +++ b/tests/table/test_mor_delete.py @@ -150,6 +150,7 @@ def test_mor_delete_basic_produces_position_delete_without_rewriting_data_files( current_snapshot = table.current_snapshot() assert current_snapshot is not None + assert current_snapshot.summary is not None assert current_snapshot.summary.operation == Operation.OVERWRITE reloaded = catalog.load_table(identifier) @@ -226,6 +227,7 @@ def test_mor_delete_sequence_number_scopes_delete_to_existing_data_files(catalog table.delete(EqualTo("id", 2)) delete_snapshot = table.current_snapshot() assert delete_snapshot is not None + assert delete_snapshot.sequence_number is not None # The position-delete file's sequence number must equal the delete snapshot's sequence # number. This is what scopes the delete to pre-existing data files only: a positional diff --git a/tests/table/test_mor_row_delta_producer.py b/tests/table/test_mor_row_delta_producer.py index 18fb4c9861..1f1dc5f2e6 100644 --- a/tests/table/test_mor_row_delta_producer.py +++ b/tests/table/test_mor_row_delta_producer.py @@ -251,6 +251,7 @@ def test_row_delta_position_delete_end_to_end(catalog: Catalog) -> None: current_snapshot = table.current_snapshot() assert current_snapshot is not None + assert current_snapshot.summary is not None assert current_snapshot.summary.operation == Operation.OVERWRITE reloaded = catalog.load_table(identifier) @@ -292,6 +293,7 @@ def test_position_delete_sequence_number_does_not_affect_later_appends(catalog: data_file = _append_initial_rows(table) first_snapshot = table.current_snapshot() assert first_snapshot is not None + assert first_snapshot.sequence_number is not None first_sequence_number = first_snapshot.sequence_number delete_file = write_position_delete_file( diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index b1e26c62ba..2cebf59daf 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -16,6 +16,7 @@ # under the License. # pylint: disable=redefined-outer-name,arguments-renamed,fixme from tempfile import TemporaryDirectory +from typing import Any, cast import fastavro import pytest @@ -997,7 +998,7 @@ def test_positional_delete_schema_pos_serializes_as_avro_long() -> None: from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA from pyiceberg.utils.schema_conversion import AvroSchemaConversion - avro_schema = AvroSchemaConversion().iceberg_to_avro(POSITIONAL_DELETE_SCHEMA, schema_name="pos_delete") + avro_schema = cast(dict[str, Any], AvroSchemaConversion().iceberg_to_avro(POSITIONAL_DELETE_SCHEMA, schema_name="pos_delete")) pos_field = next(field for field in avro_schema["fields"] if field["name"] == "pos") assert "long" in pos_field["type"]