diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 076098c757..a34f18f0e3 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -208,6 +208,11 @@ T = TypeVar("T") +POSITIONAL_DELETE_WRITE_SCHEMA = Schema( + NestedField(2147483546, "file_path", StringType()), + NestedField(2147483545, "pos", LongType()), +) + @lru_cache def _cached_resolve_s3_region(bucket: str) -> str | None: @@ -2685,6 +2690,81 @@ def write_parquet(task: WriteTask) -> DataFile: return iter(data_files) +def _write_position_delete_file( + io: FileIO, + table_metadata: TableMetadata, + write_uuid: uuid.UUID, + counter: itertools.count[int], + partition: Record, + spec_id: int, + delete_rows: Iterable[tuple[str, int]], +) -> DataFile: + """Write one v2 position-delete Parquet file for a single partition.""" + rows = sorted(delete_rows) + if not rows: + raise ValueError("Cannot write an empty position-delete file") + + parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) + row_group_size = property_as_int( + properties=table_metadata.properties, + property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT, + default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT, + ) + arrow_schema = pa.schema( + [ + pa.field("file_path", pa.string(), nullable=False, metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"2147483546"}), + pa.field("pos", pa.int64(), nullable=False, metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"2147483545"}), + ] + ) + arrow_table = pa.Table.from_arrays( + [ + pa.array([file_path for file_path, _ in rows], type=pa.string()), + pa.array([pos for _, pos in rows], type=pa.int64()), + ], + schema=arrow_schema, + ) + + location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties) + file_path = location_provider.new_data_location( + data_file_name=f"00000-{next(counter)}-{write_uuid}-deletes.parquet", + ) + fo = io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=arrow_table.schema, store_decimal_as_integer=True, **parquet_writer_kwargs) as writer: + writer.write(arrow_table, row_group_size=row_group_size) + + stats_columns = compute_statistics_plan(POSITIONAL_DELETE_WRITE_SCHEMA, table_metadata.properties) + # The reader routes a position-delete file to a data file by its exact file_path bound, so the + # bound must be the full, untruncated path rather than the default truncate(16) string metric. + stats_columns[2147483546] = StatisticsCollector( + field_id=2147483546, + iceberg_type=StringType(), + mode=MetricsMode(MetricModeTypes.FULL), + column_name="file_path", + ) + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=writer.writer.metadata, + stats_columns=stats_columns, + parquet_column_mapping=parquet_path_to_id_mapping(POSITIONAL_DELETE_WRITE_SCHEMA), + ) + + data_file = DataFile.from_args( + _table_format_version=table_metadata.format_version, + content=DataFileContent.POSITION_DELETES, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=partition, + file_size_in_bytes=len(fo), + sort_order_id=None, + spec_id=spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + data_file.spec_id = spec_id + return data_file + + def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[pa.RecordBatch]]: """Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size``. diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 3811a9d894..9f94607262 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -774,9 +774,7 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()} -POSITIONAL_DELETE_SCHEMA = Schema( - NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", IntegerType()) -) +POSITIONAL_DELETE_SCHEMA = Schema(NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", LongType())) class ManifestFile(Record): @@ -1214,11 +1212,13 @@ def __init__( output_file: OutputFile, snapshot_id: int, avro_compression: AvroCompressionCodec, + content: ManifestContent = ManifestContent.DATA, ): super().__init__(spec, schema, output_file, snapshot_id, avro_compression) + self._content = content def content(self) -> ManifestContent: - return ManifestContent.DATA + return self._content @property def version(self) -> TableVersion: @@ -1228,7 +1228,7 @@ def version(self) -> TableVersion: def _meta(self) -> dict[str, str]: return { **super()._meta, - "content": "data", + "content": "deletes" if self._content == ManifestContent.DELETES else "data", } def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: @@ -1247,11 +1247,14 @@ def write_manifest( output_file: OutputFile, snapshot_id: int, avro_compression: AvroCompressionCodec, + content: ManifestContent = ManifestContent.DATA, ) -> ManifestWriter: if format_version == 1: + if content != ManifestContent.DATA: + raise ValidationError("Cannot write delete manifests for a v1 table") return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression) elif format_version == 2: - return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression) + return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression, content) else: raise ValueError(f"Cannot write manifest for table version: {format_version}") diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 597f62632f..7111222059 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -716,15 +716,23 @@ def delete( """ from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow + if isinstance(delete_filter, str): + delete_filter = _parse_row_filter(delete_filter) + if ( self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT) == TableProperties.DELETE_MODE_MERGE_ON_READ ): + if self.table_metadata.format_version == 2: + self._delete_merge_on_read_position_deletes( + delete_filter=delete_filter, + snapshot_properties=snapshot_properties, + case_sensitive=case_sensitive, + branch=branch, + ) + return warnings.warn("Merge on read is not yet supported, falling back to copy-on-write", stacklevel=2) - if isinstance(delete_filter, str): - delete_filter = _parse_row_filter(delete_filter) - with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot: delete_snapshot.delete_by_predicate(delete_filter, case_sensitive) @@ -790,6 +798,80 @@ def delete( if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed: warnings.warn("Delete operation did not match any records", stacklevel=2) + def _delete_merge_on_read_position_deletes( + self, + delete_filter: BooleanExpression, + snapshot_properties: dict[str, str] = EMPTY_DICT, + case_sensitive: bool = True, + branch: str | None = MAIN_BRANCH, + ) -> None: + import pyarrow as pa + + from pyiceberg.io.pyarrow import ( + ArrowScan, + _read_all_delete_files, + _write_position_delete_file, + expression_to_pyarrow, + ) + + file_scan = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive) + if branch is not None: + file_scan = file_scan.use_ref(branch) + tasks = list(file_scan.plan_files()) + + bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive) + pyarrow_filter = expression_to_pyarrow(bound_delete_filter, self.table_metadata.schema()) + deletes_per_file = _read_all_delete_files(self._table.io, tasks) + delete_rows_by_partition: dict[tuple[int, Record], list[tuple[str, int]]] = {} + + raw_scan = ArrowScan( + table_metadata=self.table_metadata, + io=self._table.io, + projected_schema=self.table_metadata.schema(), + row_filter=AlwaysTrue(), + case_sensitive=case_sensitive, + ) + + for task in tasks: + existing_deleted_positions: set[int] = set() + for positions in deletes_per_file.get(task.file.file_path, []): + existing_deleted_positions.update(int(pos) for pos in positions.to_pylist()) + + current_index = 0 + raw_task = FileScanTask(task.file, delete_files=set(), residual=AlwaysTrue()) + for batch in raw_scan.to_record_batches([raw_task]): + row_positions = pa.array(range(current_index, current_index + batch.num_rows), type=pa.int64()) + current_index += batch.num_rows + + batch_with_positions = pa.Table.from_batches([batch]).append_column("__pyiceberg_position", row_positions) + matching_positions = batch_with_positions.filter(pyarrow_filter).column("__pyiceberg_position").to_pylist() + rows_to_delete = [ + (task.file.file_path, pos) for pos in matching_positions if pos not in existing_deleted_positions + ] + + if rows_to_delete: + key = (task.file.spec_id, task.file.partition) + delete_rows_by_partition.setdefault(key, []).extend(rows_to_delete) + + if not delete_rows_by_partition: + warnings.warn("Delete operation did not match any records", stacklevel=2) + return + + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot: + counter = itertools.count(0) + for (spec_id, partition), delete_rows in delete_rows_by_partition.items(): + delete_snapshot.append_delete_file( + _write_position_delete_file( + io=self._table.io, + table_metadata=self.table_metadata, + write_uuid=delete_snapshot.commit_uuid, + counter=counter, + partition=partition, + spec_id=spec_id, + delete_rows=delete_rows, + ) + ) + def upsert( self, df: pa.Table, diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 7931edacdd..dbfd577aed 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -98,6 +98,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _snapshot_id: int _parent_snapshot_id: int | None _added_data_files: list[DataFile] + _added_delete_files: list[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: set[DataFile] _compression: AvroCompressionCodec @@ -120,6 +121,7 @@ def __init__( self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() self._added_data_files = [] + self._added_delete_files = [] self._deleted_data_files = set() self.snapshot_properties = snapshot_properties self._manifest_num_counter = itertools.count(0) @@ -148,6 +150,10 @@ def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._added_data_files.append(data_file) return self + def append_delete_file(self, delete_file: DataFile) -> _SnapshotProducer[U]: + self._added_delete_files.append(delete_file) + return self + def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._deleted_data_files.add(data_file) return self @@ -195,6 +201,29 @@ def _write_added_manifest() -> list[ManifestFile]: else: return [] + def _write_added_delete_manifest() -> list[ManifestFile]: + if self._added_delete_files: + delete_manifests = [] + partition_groups: dict[int, list[DataFile]] = defaultdict(list) + for delete_file in self._added_delete_files: + partition_groups[delete_file.spec_id].append(delete_file) + for spec_id, delete_files in partition_groups.items(): + with self.new_manifest_writer(spec=self.spec(spec_id), content=ManifestContent.DELETES) as writer: + for delete_file in delete_files: + writer.add( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=self._snapshot_id, + sequence_number=None, + file_sequence_number=None, + data_file=delete_file, + ) + ) + delete_manifests.append(writer.to_manifest_file()) + return delete_manifests + else: + return [] + def _write_delete_manifest() -> list[ManifestFile]: # Check if we need to mark the files as deleted deleted_entries = self._deleted_entries() @@ -218,10 +247,13 @@ def _write_delete_manifest() -> list[ManifestFile]: executor = ExecutorFactory.get_or_create() added_manifests = executor.submit(_write_added_manifest) + added_delete_manifests = executor.submit(_write_added_delete_manifest) delete_manifests = executor.submit(_write_delete_manifest) existing_manifests = executor.submit(self._existing_manifests) - return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result()) + return self._process_manifests( + added_manifests.result() + added_delete_manifests.result() + delete_manifests.result() + existing_manifests.result() + ) def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: from pyiceberg.table import TableProperties @@ -230,6 +262,7 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: table_metadata = self._transaction.table_metadata schema = table_metadata.schema() default_spec = table_metadata.spec() + specs = table_metadata.specs() partition_summary_limit = int( table_metadata.properties.get( @@ -244,9 +277,14 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: partition_spec=default_spec, schema=schema, ) + for delete_file in self._added_delete_files: + ssc.add_file( + data_file=delete_file, + partition_spec=specs[delete_file.spec_id], + schema=schema, + ) if len(self._deleted_data_files) > 0: - specs = table_metadata.specs() for data_file in self._deleted_data_files: ssc.remove_file( data_file=data_file, @@ -339,7 +377,7 @@ def schema(self) -> Schema: def spec(self, spec_id: int) -> PartitionSpec: return self._transaction.table_metadata.specs()[spec_id] - def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: + def new_manifest_writer(self, spec: PartitionSpec, content: ManifestContent = ManifestContent.DATA) -> ManifestWriter: return write_manifest( format_version=self._transaction.table_metadata.format_version, spec=spec, @@ -347,6 +385,7 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, avro_compression=self._compression, + content=content, ) def new_manifest_output(self) -> OutputFile: @@ -400,11 +439,16 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]): def _commit(self) -> UpdatesAndRequirements: # Only produce a commit when there is something to delete - if self.files_affected: + if self.files_affected or self._added_delete_files: return super()._commit() else: return (), () + def append_delete_file(self, delete_file: DataFile) -> _DeleteFiles: + self._operation = Operation.OVERWRITE + super().append_delete_file(delete_file) + return self + @cached_property def _compute_deletes(self) -> tuple[list[ManifestFile], list[ManifestEntry], bool]: """Computes all the delete operation and cache it when nothing changes. diff --git a/tests/table/test_mor_position_delete.py b/tests/table/test_mor_position_delete.py new file mode 100644 index 0000000000..b2b24c9ae4 --- /dev/null +++ b/tests/table/test_mor_position_delete.py @@ -0,0 +1,298 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.expressions import AlwaysTrue, EqualTo, In +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import Table, TableProperties +from pyiceberg.table.snapshots import Operation +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import LongType, NestedField, StringType + +ICEBERG_SCHEMA = Schema( + NestedField(1, "id", LongType(), required=True), + NestedField(2, "category", StringType(), required=False), + NestedField(3, "payload", StringType(), required=False), +) +ARROW_SCHEMA = pa.schema( + [ + pa.field("id", pa.int64(), nullable=False), + pa.field("category", pa.string(), nullable=True), + pa.field("payload", pa.string(), nullable=True), + ] +) +MOR_PROPERTIES = { + TableProperties.FORMAT_VERSION: "2", + TableProperties.DELETE_MODE: TableProperties.DELETE_MODE_MERGE_ON_READ, +} + + +def _ensure_namespace(catalog: Catalog) -> None: + if not catalog.namespace_exists("default"): + catalog.create_namespace("default") + + +def _create_table( + catalog: Catalog, + name: str, + properties: dict[str, str] | None = None, + partition_spec: PartitionSpec | None = None, +) -> Table: + _ensure_namespace(catalog) + return catalog.create_table( + f"default.{name}", + ICEBERG_SCHEMA, + partition_spec=partition_spec or PartitionSpec(), + properties=properties or MOR_PROPERTIES, + ) + + +def _append_rows(table: Table, rows: list[tuple[int, str, str]]) -> None: + table.append( + pa.Table.from_pylist( + [{"id": row[0], "category": row[1], "payload": row[2]} for row in rows], + schema=ARROW_SCHEMA, + ) + ) + + +def _visible_rows(table: Table) -> list[tuple[int, str, str]]: + rows = table.scan().to_arrow().sort_by([("id", "ascending")]).to_pylist() + return [(row["id"], row["category"], row["payload"]) for row in rows] + + +def _content_files(table: Table, content: DataFileContent) -> list[DataFile]: + snapshot = table.current_snapshot() + if snapshot is None: + return [] + + return [ + entry.data_file + for manifest in snapshot.manifests(table.io) + for entry in manifest.fetch_manifest_entry(table.io) + if entry.data_file.content == content + ] + + +def _data_file_paths(table: Table) -> list[str]: + return sorted(file.file_path for file in _content_files(table, DataFileContent.DATA)) + + +def _position_delete_files(table: Table) -> list[DataFile]: + return _content_files(table, DataFileContent.POSITION_DELETES) + + +def _delete_manifests(table: Table) -> list[ManifestFile]: + snapshot = table.current_snapshot() + assert snapshot is not None + return [manifest for manifest in snapshot.manifests(table.io) if manifest.content == ManifestContent.DELETES] + + +def test_single_row_mor_delete(catalog: Catalog) -> None: + table = _create_table(catalog, "single_row") + _append_rows(table, [(1, "a", "one"), (2, "a", "two"), (3, "a", "three")]) + data_paths_before = _data_file_paths(table) + + table.delete(EqualTo("id", 2)) + + assert _visible_rows(table) == [(1, "a", "one"), (3, "a", "three")] + assert _data_file_paths(table) == data_paths_before + snapshot = table.current_snapshot() + assert snapshot is not None + assert snapshot.summary is not None + assert snapshot.summary.operation == Operation.OVERWRITE + + delete_files = _position_delete_files(table) + assert len(delete_files) == 1 + assert delete_files[0].record_count == 1 + assert len(_delete_manifests(table)) == 1 + + with table.io.new_input(delete_files[0].file_path).open() as input_file: + delete_schema = pq.read_schema(input_file) + + assert delete_schema.field("file_path").type == pa.string() + assert not delete_schema.field("file_path").nullable + assert delete_schema.field("file_path").metadata[b"PARQUET:field_id"] == b"2147483546" + assert delete_schema.field("pos").type == pa.int64() + assert not delete_schema.field("pos").nullable + assert delete_schema.field("pos").metadata[b"PARQUET:field_id"] == b"2147483545" + + +def test_delete_file_path_bound_is_full_untruncated_path(catalog: Catalog) -> None: + table = _create_table(catalog, "path_bound") + _append_rows(table, [(1, "a", "one"), (2, "a", "two"), (3, "a", "three")]) + + table.delete(EqualTo("id", 2)) + + delete_files = _position_delete_files(table) + assert len(delete_files) == 1 + delete_file = delete_files[0] + data_path = _data_file_paths(table)[0] + + path_field_id = 2147483546 + lower = delete_file.lower_bounds[path_field_id].decode("utf-8") + upper = delete_file.upper_bounds[path_field_id].decode("utf-8") + # The reader only routes through the exact-path delete index when lower == upper == the full + # data-file path; a truncated string bound would silently fall back to the partition index. + assert lower == upper == data_path + + +def test_multi_row_mor_delete(catalog: Catalog) -> None: + table = _create_table(catalog, "multi_row") + _append_rows(table, [(1, "a", "one"), (2, "a", "two"), (3, "a", "three"), (4, "a", "four")]) + data_paths_before = _data_file_paths(table) + + table.delete(In("id", [2, 4])) + + assert _visible_rows(table) == [(1, "a", "one"), (3, "a", "three")] + assert _data_file_paths(table) == data_paths_before + delete_files = _position_delete_files(table) + assert len(delete_files) == 1 + assert delete_files[0].record_count == 2 + + +def test_empty_match_warns_and_writes_nothing(catalog: Catalog) -> None: + table = _create_table(catalog, "empty_match") + _append_rows(table, [(1, "a", "one"), (2, "a", "two"), (3, "a", "three")]) + snapshot_before = table.current_snapshot() + assert snapshot_before is not None + + with pytest.warns(UserWarning, match="Delete operation did not match any records"): + table.delete(EqualTo("id", 999)) + + snapshot_after = table.current_snapshot() + assert snapshot_after is not None + assert snapshot_after.snapshot_id == snapshot_before.snapshot_id + assert _position_delete_files(table) == [] + assert _delete_manifests(table) == [] + + +def test_delete_all_rows_writes_full_delete_file_not_drop(catalog: Catalog) -> None: + table = _create_table(catalog, "delete_all") + _append_rows(table, [(1, "a", "one"), (2, "a", "two"), (3, "a", "three")]) + data_paths_before = _data_file_paths(table) + + table.delete(AlwaysTrue()) + + assert _data_file_paths(table) == data_paths_before + delete_files = _position_delete_files(table) + assert len(delete_files) == 1 + assert delete_files[0].record_count == 3 + assert table.scan().to_arrow().num_rows == 0 + + +def test_multiple_data_files_one_partition(catalog: Catalog) -> None: + table = _create_table(catalog, "multiple_data_files_one_partition") + _append_rows(table, [(1, "a", "one"), (2, "a", "two")]) + _append_rows(table, [(3, "b", "three"), (4, "b", "four")]) + data_paths_before = _data_file_paths(table) + assert len(data_paths_before) == 2 + + table.delete(In("id", [2, 3])) + + assert _data_file_paths(table) == data_paths_before + delete_files = _position_delete_files(table) + assert len(delete_files) == 1 + assert delete_files[0].record_count == 2 + + with table.io.new_input(delete_files[0].file_path).open() as input_file: + delete_table = pq.read_table(input_file) + + assert set(delete_table["file_path"].to_pylist()) == set(data_paths_before) + assert len(set(delete_table["file_path"].to_pylist())) == 2 + assert sorted(delete_table["pos"].to_pylist()) == [0, 1] + assert _visible_rows(table) == [(1, "a", "one"), (4, "b", "four")] + + +def test_partitioned_mor_delete_writes_delete_files_per_partition(catalog: Catalog) -> None: + spec = PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="category")) + table = _create_table(catalog, "partitioned", partition_spec=spec) + _append_rows( + table, + [(1, "a", "one"), (2, "a", "two"), (3, "b", "three"), (4, "b", "four"), (5, "c", "five")], + ) + data_partitions = {file.partition for file in _content_files(table, DataFileContent.DATA)} + data_paths_before = _data_file_paths(table) + + table.delete(In("id", [2, 4])) + + assert _visible_rows(table) == [(1, "a", "one"), (3, "b", "three"), (5, "c", "five")] + assert _data_file_paths(table) == data_paths_before + delete_files = _position_delete_files(table) + assert len(delete_files) == 2 + assert {file.partition[0] for file in delete_files} == {"a", "b"} + assert {file.partition for file in delete_files}.issubset(data_partitions) + assert sorted(file.record_count for file in delete_files) == [1, 1] + + +def test_sequence_number_scoping_does_not_delete_later_appends(catalog: Catalog) -> None: + properties = {**MOR_PROPERTIES, TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: "1"} + table = _create_table(catalog, "sequence_scoping", properties=properties) + _append_rows(table, [(1, "delete", "a1"), (2, "keep", "a2"), (3, "delete", "a3")]) + + table.delete(EqualTo("category", "delete")) + delete_files_after_delete = _position_delete_files(table) + assert len(delete_files_after_delete) == 1 + assert delete_files_after_delete[0].record_count == 2 + + _append_rows(table, [(4, "delete", "b1"), (5, "keep", "b2")]) + + assert _visible_rows(table) == [(2, "keep", "a2"), (4, "delete", "b1"), (5, "keep", "b2")] + + +def test_multiple_successive_mor_deletes(catalog: Catalog) -> None: + table = _create_table(catalog, "successive") + _append_rows(table, [(1, "a", "one"), (2, "a", "two"), (3, "a", "three"), (4, "a", "four")]) + + table.delete(EqualTo("id", 2)) + table.delete(In("id", [2, 4])) + + assert _visible_rows(table) == [(1, "a", "one"), (3, "a", "three")] + delete_files = _position_delete_files(table) + assert len(delete_files) == 2 + assert sorted(file.record_count for file in delete_files) == [1, 1] + + +def test_plain_scan_to_arrow_after_mor_delete(catalog: Catalog) -> None: + table = _create_table(catalog, "plain_scan") + _append_rows(table, [(1, "a", "one"), (2, "a", "two"), (3, "a", "three")]) + + table.delete(EqualTo("id", 3)) + + result = table.scan().to_arrow().sort_by([("id", "ascending")]) + assert result["id"].to_pylist() == [1, 2] + assert result["payload"].to_pylist() == ["one", "two"] + + +def test_copy_on_write_default_still_rewrites_data_files(catalog: Catalog) -> None: + table = _create_table(catalog, "cow_default", properties={TableProperties.FORMAT_VERSION: "2"}) + _append_rows(table, [(1, "a", "one"), (2, "a", "two"), (3, "a", "three")]) + data_paths_before = _data_file_paths(table) + + table.delete(EqualTo("id", 2)) + + assert _visible_rows(table) == [(1, "a", "one"), (3, "a", "three")] + assert _data_file_paths(table) != data_paths_before + assert _position_delete_files(table) == []