From 3ba5e75a1511ab81ab786950d0b0aa04f8c5192d Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 18:21:16 -0500 Subject: [PATCH 1/5] Add position-delete writer + row-delta snapshot producer Introduces the merge-on-read write primitives so a v2 table can commit position deletes without rewriting data files: - manifest.py: ManifestWriterV2 / write_manifest accept a ManifestContent so deletes manifests can be written (v1 rejects delete manifests). - io/pyarrow.py: write_position_delete_file writes one spec-compliant v2 position-delete Parquet for a single referenced data file (sorted, deduped long positions, full untruncated file_path bound, content=POSITION_DELETES, equality_ids=None, partition/spec_id copied from the data file). - table/update/snapshot.py: _SnapshotProducer gains append_delete_file and writes a deletes manifest per spec; a new _RowDeltaFiles producer (exposed via UpdateSnapshot.row_delta(), aliased RowDeltaSnapshotProducer) commits appended data files AND delete files as one OVERWRITE row-delta snapshot that the existing positional-delete reader applies. Delete-file sequence numbers stay unassigned and are stamped at commit, so delete seq == data seq == snapshot seq. Data files are never rewritten. --- pyiceberg/io/pyarrow.py | 88 ++++++++ pyiceberg/manifest.py | 11 +- pyiceberg/table/update/snapshot.py | 92 +++++++- tests/table/test_mor_row_delta_producer.py | 249 +++++++++++++++++++++ 4 files changed, 434 insertions(+), 6 deletions(-) create mode 100644 tests/table/test_mor_row_delta_producer.py diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 076098c757..ee08f6fc6d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -208,6 +208,11 @@ T = TypeVar("T") +POSITIONAL_DELETE_WRITE_SCHEMA = Schema( + NestedField(2147483546, "file_path", StringType()), + NestedField(2147483545, "pos", LongType()), +) + @lru_cache def _cached_resolve_s3_region(bucket: str) -> str | None: @@ -2685,6 +2690,89 @@ def write_parquet(task: WriteTask) -> DataFile: return iter(data_files) +def write_position_delete_file( + io: FileIO, + table_metadata: TableMetadata, + referenced_data_file: DataFile, + positions: Iterable[int], + write_uuid: uuid.UUID | None = None, + counter: itertools.count[int] | None = None, +) -> DataFile: + """Write one Iceberg v2 position-delete Parquet file for a single referenced data file. + + The delete file's `file_path` column is the constant referenced_data_file.file_path + (verbatim, including any scheme), and `pos` are the deleted row positions, deduplicated + and sorted ascending (spec sort order is (file_path, pos); file_path is constant here). + The returned DataFile has content=POSITION_DELETES, equality_ids=None, partition and + spec_id copied from the referenced data file so it is partition-scoped, and record_count + equal to the number of deleted positions. + """ + write_uuid = write_uuid or uuid.uuid4() + counter = counter or itertools.count(0) + + delete_positions = sorted({int(position) for position in positions}) + if not delete_positions: + raise ValueError("Cannot write an empty position-delete file") + + parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) + row_group_size = property_as_int( + properties=table_metadata.properties, + property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT, + default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT, + ) + arrow_schema = pa.schema( + [ + pa.field("file_path", pa.string(), metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"2147483546"}), + pa.field("pos", pa.int64(), metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"2147483545"}), + ] + ) + arrow_table = pa.Table.from_arrays( + [ + pa.array([referenced_data_file.file_path] * len(delete_positions), type=pa.string()), + pa.array(delete_positions, type=pa.int64()), + ], + schema=arrow_schema, + ) + + location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties) + file_path = location_provider.new_data_location( + data_file_name=f"00000-{next(counter)}-{write_uuid}-deletes.parquet", + ) + fo = io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=arrow_table.schema, store_decimal_as_integer=True, **parquet_writer_kwargs) as writer: + writer.write(arrow_table, row_group_size=row_group_size) + + stats_columns = compute_statistics_plan(POSITIONAL_DELETE_WRITE_SCHEMA, table_metadata.properties) + stats_columns[2147483546] = StatisticsCollector( + field_id=2147483546, + iceberg_type=StringType(), + mode=MetricsMode(MetricModeTypes.FULL), + column_name="file_path", + ) + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=writer.writer.metadata, + stats_columns=stats_columns, + parquet_column_mapping=parquet_path_to_id_mapping(POSITIONAL_DELETE_WRITE_SCHEMA), + ) + + data_file = DataFile.from_args( + _table_format_version=table_metadata.format_version, + content=DataFileContent.POSITION_DELETES, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=referenced_data_file.partition, + file_size_in_bytes=len(fo), + sort_order_id=None, + spec_id=referenced_data_file.spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + data_file.spec_id = referenced_data_file.spec_id + return data_file + + def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[pa.RecordBatch]]: """Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size``. diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 434ffae623..9f94607262 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -1212,11 +1212,13 @@ def __init__( output_file: OutputFile, snapshot_id: int, avro_compression: AvroCompressionCodec, + content: ManifestContent = ManifestContent.DATA, ): super().__init__(spec, schema, output_file, snapshot_id, avro_compression) + self._content = content def content(self) -> ManifestContent: - return ManifestContent.DATA + return self._content @property def version(self) -> TableVersion: @@ -1226,7 +1228,7 @@ def version(self) -> TableVersion: def _meta(self) -> dict[str, str]: return { **super()._meta, - "content": "data", + "content": "deletes" if self._content == ManifestContent.DELETES else "data", } def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: @@ -1245,11 +1247,14 @@ def write_manifest( output_file: OutputFile, snapshot_id: int, avro_compression: AvroCompressionCodec, + content: ManifestContent = ManifestContent.DATA, ) -> ManifestWriter: if format_version == 1: + if content != ManifestContent.DATA: + raise ValidationError("Cannot write delete manifests for a v1 table") return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression) elif format_version == 2: - return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression) + return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression, content) else: raise ValueError(f"Cannot write manifest for table version: {format_version}") diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 7931edacdd..1b8f4eb7fd 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -98,6 +98,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _snapshot_id: int _parent_snapshot_id: int | None _added_data_files: list[DataFile] + _added_delete_files: list[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: set[DataFile] _compression: AvroCompressionCodec @@ -120,6 +121,7 @@ def __init__( self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() self._added_data_files = [] + self._added_delete_files = [] self._deleted_data_files = set() self.snapshot_properties = snapshot_properties self._manifest_num_counter = itertools.count(0) @@ -148,6 +150,10 @@ def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._added_data_files.append(data_file) return self + def append_delete_file(self, delete_file: DataFile) -> _SnapshotProducer[U]: + self._added_delete_files.append(delete_file) + return self + def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._deleted_data_files.add(data_file) return self @@ -195,6 +201,29 @@ def _write_added_manifest() -> list[ManifestFile]: else: return [] + def _write_added_delete_manifest() -> list[ManifestFile]: + if self._added_delete_files: + added_delete_manifests = [] + partition_groups: dict[int, list[DataFile]] = defaultdict(list) + for delete_file in self._added_delete_files: + partition_groups[delete_file.spec_id].append(delete_file) + for spec_id, delete_files in partition_groups.items(): + with self.new_manifest_writer(spec=self.spec(spec_id), content=ManifestContent.DELETES) as writer: + for delete_file in delete_files: + writer.add( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=self._snapshot_id, + sequence_number=None, + file_sequence_number=None, + data_file=delete_file, + ) + ) + added_delete_manifests.append(writer.to_manifest_file()) + return added_delete_manifests + else: + return [] + def _write_delete_manifest() -> list[ManifestFile]: # Check if we need to mark the files as deleted deleted_entries = self._deleted_entries() @@ -218,10 +247,13 @@ def _write_delete_manifest() -> list[ManifestFile]: executor = ExecutorFactory.get_or_create() added_manifests = executor.submit(_write_added_manifest) + added_delete_manifests = executor.submit(_write_added_delete_manifest) delete_manifests = executor.submit(_write_delete_manifest) existing_manifests = executor.submit(self._existing_manifests) - return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result()) + return self._process_manifests( + added_manifests.result() + added_delete_manifests.result() + delete_manifests.result() + existing_manifests.result() + ) def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: from pyiceberg.table import TableProperties @@ -245,8 +277,15 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: schema=schema, ) + specs = table_metadata.specs() + for delete_file in self._added_delete_files: + ssc.add_file( + data_file=delete_file, + partition_spec=specs[delete_file.spec_id], + schema=schema, + ) + if len(self._deleted_data_files) > 0: - specs = table_metadata.specs() for data_file in self._deleted_data_files: ssc.remove_file( data_file=data_file, @@ -339,7 +378,7 @@ def schema(self) -> Schema: def spec(self, spec_id: int) -> PartitionSpec: return self._transaction.table_metadata.specs()[spec_id] - def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: + def new_manifest_writer(self, spec: PartitionSpec, content: ManifestContent = ManifestContent.DATA) -> ManifestWriter: return write_manifest( format_version=self._transaction.table_metadata.format_version, spec=spec, @@ -347,6 +386,7 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, avro_compression=self._compression, + content=content, ) def new_manifest_output(self) -> OutputFile: @@ -529,6 +569,42 @@ def _deleted_entries(self) -> list[ManifestEntry]: return [] +class _RowDeltaFiles(_SnapshotProducer["_RowDeltaFiles"]): + """Commits appended data files AND position-delete files as ONE row-delta (OVERWRITE) snapshot. + + Existing manifests are carried forward untouched (no copy-on-write rewrite); the existing + positional-delete reader applies the new delete files. Delete-file sequence numbers are left + unassigned and stamped at commit, so delete seq == data seq == snapshot seq (a true row delta). + """ + + def _existing_manifests(self) -> list[ManifestFile]: + """Carry forward all existing manifests from the parent snapshot, like _FastAppendFiles.""" + existing_manifests = [] + + if self._parent_snapshot_id is not None: + previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + + if previous_snapshot is None: + raise ValueError(f"Snapshot could not be found: {self._parent_snapshot_id}") + + for manifest in previous_snapshot.manifests(io=self._io): + if manifest.has_added_files() or manifest.has_existing_files() or manifest.added_snapshot_id == self._snapshot_id: + existing_manifests.append(manifest) + + return existing_manifests + + def _deleted_entries(self) -> list[ManifestEntry]: + return [] + + def _commit(self) -> UpdatesAndRequirements: + if self._added_data_files or self._added_delete_files: + return super()._commit() + return (), () + + +RowDeltaSnapshotProducer = _RowDeltaFiles + + class _MergeAppendFiles(_FastAppendFiles): _target_size_bytes: int _min_count_to_merge: int @@ -720,6 +796,16 @@ def overwrite(self, commit_uuid: uuid.UUID | None = None) -> _OverwriteFiles: snapshot_properties=self._snapshot_properties, ) + def row_delta(self, commit_uuid: uuid.UUID | None = None) -> _RowDeltaFiles: + return _RowDeltaFiles( + operation=Operation.OVERWRITE, + transaction=self._transaction, + io=self._io, + commit_uuid=commit_uuid, + snapshot_properties=self._snapshot_properties, + branch=self._branch, + ) + def delete(self) -> _DeleteFiles: return _DeleteFiles( operation=Operation.DELETE, diff --git a/tests/table/test_mor_row_delta_producer.py b/tests/table/test_mor_row_delta_producer.py new file mode 100644 index 0000000000..490f1aff0b --- /dev/null +++ b/tests/table/test_mor_row_delta_producer.py @@ -0,0 +1,249 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import itertools +import uuid + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.conversions import from_bytes, to_bytes +from pyiceberg.exceptions import NamespaceAlreadyExistsError +from pyiceberg.io.pyarrow import PYARROW_PARQUET_FIELD_ID_KEY, write_position_delete_file +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile +from pyiceberg.schema import Schema +from pyiceberg.table import Table +from pyiceberg.table.snapshots import Operation +from pyiceberg.types import LongType, NestedField, StringType + +ICEBERG_SCHEMA = Schema( + NestedField(1, "id", LongType(), required=True), + NestedField(2, "data", StringType(), required=True), +) +ARROW_SCHEMA = pa.schema( + [ + pa.field("id", pa.int64(), nullable=False), + pa.field("data", pa.string(), nullable=False), + ] +) + + +def _arrow_table(rows: list[dict[str, object]]) -> pa.Table: + return pa.Table.from_pylist(rows, schema=ARROW_SCHEMA) + + +def _create_v2_table(catalog: Catalog, identifier: str) -> Table: + try: + catalog.create_namespace("default") + except NamespaceAlreadyExistsError: + pass + return catalog.create_table(identifier=identifier, schema=ICEBERG_SCHEMA, properties={"format-version": "2"}) + + +def _append_initial_rows(table: Table) -> DataFile: + table.append( + _arrow_table( + [ + {"id": 1, "data": "a"}, + {"id": 2, "data": "b"}, + {"id": 3, "data": "c"}, + {"id": 4, "data": "d"}, + ] + ) + ) + data_files = _current_data_files(table) + assert len(data_files) == 1 + return data_files[0] + + +def _current_data_files(table: Table) -> list[DataFile]: + snapshot = table.current_snapshot() + assert snapshot is not None + return [ + entry.data_file + for manifest in snapshot.manifests(io=table.io) + if manifest.content == ManifestContent.DATA + for entry in manifest.fetch_manifest_entry(io=table.io) + if entry.data_file.content == DataFileContent.DATA + ] + + +def _current_manifests_with_content(table: Table, content: ManifestContent) -> list[ManifestFile]: + snapshot = table.current_snapshot() + assert snapshot is not None + return [manifest for manifest in snapshot.manifests(io=table.io) if manifest.content == content] + + +def _read_delete_parquet(table: Table, delete_file: DataFile) -> tuple[pa.Schema, pa.Table]: + with table.io.new_input(delete_file.file_path).open() as input_file: + parquet_file = pq.ParquetFile(input_file) + return parquet_file.schema_arrow, parquet_file.read() + + +def _rows(table: Table) -> list[dict[str, object]]: + return sorted(table.scan().to_arrow().to_pylist(), key=lambda row: row["id"]) + + +def _commit_delete_file(table: Table, delete_file: DataFile) -> None: + with table.transaction() as tx: + with tx.update_snapshot().row_delta() as row_delta: + row_delta.append_delete_file(delete_file) + + +def test_write_position_delete_file(catalog: Catalog) -> None: + table = _create_v2_table(catalog, "default.test_write_position_delete_file") + data_file = _append_initial_rows(table) + + delete_file = write_position_delete_file(table.io, table.metadata, data_file, [0, 2]) + + assert delete_file.content == DataFileContent.POSITION_DELETES + assert delete_file.equality_ids is None + assert delete_file.record_count == 2 + assert delete_file.partition == data_file.partition + assert delete_file.spec_id == data_file.spec_id + assert delete_file.lower_bounds[2147483546] == to_bytes(StringType(), data_file.file_path) + assert delete_file.upper_bounds[2147483546] == to_bytes(StringType(), data_file.file_path) + + schema, rows = _read_delete_parquet(table, delete_file) + assert schema.field("file_path").metadata == {PYARROW_PARQUET_FIELD_ID_KEY: b"2147483546"} + assert schema.field("file_path").type == pa.string() + assert schema.field("pos").metadata == {PYARROW_PARQUET_FIELD_ID_KEY: b"2147483545"} + assert schema.field("pos").type == pa.int64() + assert rows.column("pos").to_pylist() == [0, 2] + assert rows.column("file_path").to_pylist() == [data_file.file_path, data_file.file_path] + + +def test_write_position_delete_file_empty_positions_raises(catalog: Catalog) -> None: + table = _create_v2_table(catalog, "default.test_write_position_delete_file_empty_positions") + data_file = _append_initial_rows(table) + + with pytest.raises(ValueError, match="Cannot write an empty position-delete file"): + write_position_delete_file(table.io, table.metadata, data_file, []) + + +def test_write_position_delete_file_dedupes_and_sorts_positions(catalog: Catalog) -> None: + table = _create_v2_table(catalog, "default.test_write_position_delete_file_dedupes") + data_file = _append_initial_rows(table) + + delete_file = write_position_delete_file(table.io, table.metadata, data_file, [2, 0, 2, 1]) + + assert delete_file.record_count == 3 + _, rows = _read_delete_parquet(table, delete_file) + assert rows.column("pos").to_pylist() == [0, 1, 2] + + +def test_row_delta_position_delete_end_to_end(catalog: Catalog) -> None: + identifier = "default.test_row_delta_position_delete_end_to_end" + table = _create_v2_table(catalog, identifier) + data_file = _append_initial_rows(table) + before_data_paths = {data_file.file_path for data_file in _current_data_files(table)} + + delete_file = write_position_delete_file(table.io, table.metadata, data_file, [0, 2]) + _commit_delete_file(table, delete_file) + + assert _rows(table) == [{"id": 2, "data": "b"}, {"id": 4, "data": "d"}] + assert {data_file.file_path for data_file in _current_data_files(table)} == before_data_paths + assert len(_current_manifests_with_content(table, ManifestContent.DELETES)) == 1 + + current_snapshot = table.current_snapshot() + assert current_snapshot is not None + assert current_snapshot.summary.operation == Operation.OVERWRITE + + reloaded = catalog.load_table(identifier) + assert _rows(reloaded) == [{"id": 2, "data": "b"}, {"id": 4, "data": "d"}] + + +def test_write_position_delete_file_accepts_large_position(catalog: Catalog) -> None: + table = _create_v2_table(catalog, "default.test_write_position_delete_file_accepts_large_position") + data_file = _append_initial_rows(table) + large_position = 2**31 + 5 + + delete_file = write_position_delete_file(table.io, table.metadata, data_file, [large_position]) + + assert delete_file.record_count == 1 + _, rows = _read_delete_parquet(table, delete_file) + assert rows.column("pos").to_pylist() == [large_position] + + +def test_position_delete_sequence_number_does_not_affect_later_appends(catalog: Catalog) -> None: + table = _create_v2_table(catalog, "default.test_position_delete_sequence_number_scoping") + data_file = _append_initial_rows(table) + first_snapshot = table.current_snapshot() + assert first_snapshot is not None + first_sequence_number = first_snapshot.sequence_number + + delete_file = write_position_delete_file( + table.io, + table.metadata, + data_file, + [0], + write_uuid=uuid.uuid4(), + counter=itertools.count(0), + ) + _commit_delete_file(table, delete_file) + row_delta_snapshot = table.current_snapshot() + assert row_delta_snapshot is not None + row_delta_sequence_number = row_delta_snapshot.sequence_number + + table.append(_arrow_table([{"id": 5, "data": "e"}, {"id": 6, "data": "f"}])) + later_append_snapshot = table.current_snapshot() + assert later_append_snapshot is not None + later_append_sequence_number = later_append_snapshot.sequence_number + + assert row_delta_sequence_number == first_sequence_number + 1 + assert later_append_sequence_number == row_delta_sequence_number + 1 + assert _rows(table) == [ + {"id": 2, "data": "b"}, + {"id": 3, "data": "c"}, + {"id": 4, "data": "d"}, + {"id": 5, "data": "e"}, + {"id": 6, "data": "f"}, + ] + + +def test_row_delta_targets_only_referenced_file_among_many_in_one_partition(catalog: Catalog) -> None: + # Two separate data files live in the SAME (empty) partition. A position delete + # referencing file_one must remove ONLY a row of file_one; file_two must be untouched. + # This guards against silent mis-routing if the file_path bound were ever truncated + # (the DeleteFileIndex falls back to partition routing when lower != upper, which would + # match BOTH files in the same partition and delete the wrong row). + identifier = "default.test_row_delta_multi_file_one_partition" + table = _create_v2_table(catalog, identifier) + + table.append(_arrow_table([{"id": 1, "data": "a"}, {"id": 2, "data": "b"}])) + table.append(_arrow_table([{"id": 3, "data": "c"}, {"id": 4, "data": "d"}])) + + data_files = _current_data_files(table) + assert len(data_files) == 2 + # Confirm both files share the same (empty) partition, so routing cannot rely on partition. + assert data_files[0].partition == data_files[1].partition + + # file_one is the data file whose id-column lower bound is 1 (rows id=1,2). + file_one = next(df for df in data_files if from_bytes(LongType(), df.lower_bounds[1]) == 1) + before_data_paths = {df.file_path for df in data_files} + + # Delete pos 0 of file_one only (its first row, id=1). + delete_file = write_position_delete_file(table.io, table.metadata, file_one, [0]) + _commit_delete_file(table, delete_file) + + # id=1 (file_one pos 0) gone; id=3 (file_two pos 0) MUST survive. + assert _rows(table) == [{"id": 2, "data": "b"}, {"id": 3, "data": "c"}, {"id": 4, "data": "d"}] + assert {df.file_path for df in _current_data_files(table)} == before_data_paths From 51f15c137b0dfbb52754d5d91924951fa63ed172 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 20:40:23 -0500 Subject: [PATCH 2/5] Harden position-delete writer: validate input and reuse delete schema Validate referenced file is DATA and positions are non-negative, reject non-delete files in append_delete_file, fall back to default_spec_id when the referenced data file has no spec_id, and reuse POSITIONAL_DELETE_SCHEMA (now with required fields) instead of a duplicate write schema. --- pyiceberg/io/pyarrow.py | 24 ++++++---- pyiceberg/manifest.py | 5 +- pyiceberg/table/update/snapshot.py | 2 + tests/table/test_mor_row_delta_producer.py | 53 ++++++++++++++++++++++ tests/utils/test_manifest.py | 2 + 5 files changed, 76 insertions(+), 10 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index ee08f6fc6d..16ce909169 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -123,6 +123,7 @@ ) from pyiceberg.io.fileformat import DataFileStatistics as DataFileStatistics from pyiceberg.manifest import ( + POSITIONAL_DELETE_SCHEMA, DataFile, DataFileContent, FileFormat, @@ -208,11 +209,6 @@ T = TypeVar("T") -POSITIONAL_DELETE_WRITE_SCHEMA = Schema( - NestedField(2147483546, "file_path", StringType()), - NestedField(2147483545, "pos", LongType()), -) - @lru_cache def _cached_resolve_s3_region(bucket: str) -> str | None: @@ -2707,12 +2703,22 @@ def write_position_delete_file( spec_id copied from the referenced data file so it is partition-scoped, and record_count equal to the number of deleted positions. """ + if referenced_data_file.content != DataFileContent.DATA: + raise ValueError(f"referenced_data_file must be a DATA file, got {referenced_data_file.content}") + + try: + spec_id = referenced_data_file.spec_id + except AttributeError: + spec_id = table_metadata.default_spec_id + write_uuid = write_uuid or uuid.uuid4() counter = counter or itertools.count(0) delete_positions = sorted({int(position) for position in positions}) if not delete_positions: raise ValueError("Cannot write an empty position-delete file") + if delete_positions[0] < 0: + raise ValueError("Position-delete positions must be non-negative") parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) row_group_size = property_as_int( @@ -2743,7 +2749,7 @@ def write_position_delete_file( with pq.ParquetWriter(fos, schema=arrow_table.schema, store_decimal_as_integer=True, **parquet_writer_kwargs) as writer: writer.write(arrow_table, row_group_size=row_group_size) - stats_columns = compute_statistics_plan(POSITIONAL_DELETE_WRITE_SCHEMA, table_metadata.properties) + stats_columns = compute_statistics_plan(POSITIONAL_DELETE_SCHEMA, table_metadata.properties) stats_columns[2147483546] = StatisticsCollector( field_id=2147483546, iceberg_type=StringType(), @@ -2753,7 +2759,7 @@ def write_position_delete_file( statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=writer.writer.metadata, stats_columns=stats_columns, - parquet_column_mapping=parquet_path_to_id_mapping(POSITIONAL_DELETE_WRITE_SCHEMA), + parquet_column_mapping=parquet_path_to_id_mapping(POSITIONAL_DELETE_SCHEMA), ) data_file = DataFile.from_args( @@ -2764,12 +2770,12 @@ def write_position_delete_file( partition=referenced_data_file.partition, file_size_in_bytes=len(fo), sort_order_id=None, - spec_id=referenced_data_file.spec_id, + spec_id=spec_id, equality_ids=None, key_metadata=None, **statistics.to_serialized_dict(), ) - data_file.spec_id = referenced_data_file.spec_id + data_file.spec_id = spec_id return data_file diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 9f94607262..8e183c44f7 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -774,7 +774,10 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()} -POSITIONAL_DELETE_SCHEMA = Schema(NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", LongType())) +POSITIONAL_DELETE_SCHEMA = Schema( + NestedField(2147483546, "file_path", StringType(), required=True), + NestedField(2147483545, "pos", LongType(), required=True), +) class ManifestFile(Record): diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 1b8f4eb7fd..52ee4fb10c 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -151,6 +151,8 @@ def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: return self def append_delete_file(self, delete_file: DataFile) -> _SnapshotProducer[U]: + if delete_file.content not in (DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES): + raise ValueError(f"append_delete_file requires a delete file, got {delete_file.content}") self._added_delete_files.append(delete_file) return self diff --git a/tests/table/test_mor_row_delta_producer.py b/tests/table/test_mor_row_delta_producer.py index 490f1aff0b..c3a7ce1953 100644 --- a/tests/table/test_mor_row_delta_producer.py +++ b/tests/table/test_mor_row_delta_producer.py @@ -139,6 +139,51 @@ def test_write_position_delete_file_empty_positions_raises(catalog: Catalog) -> write_position_delete_file(table.io, table.metadata, data_file, []) +def test_write_position_delete_file_negative_position_raises(catalog: Catalog) -> None: + table = _create_v2_table(catalog, "default.test_write_position_delete_file_negative_position") + data_file = _append_initial_rows(table) + + with pytest.raises(ValueError, match="non-negative"): + write_position_delete_file(table.io, table.metadata, data_file, [-1]) + + +def test_write_position_delete_file_rejects_non_data_reference(catalog: Catalog) -> None: + table = _create_v2_table(catalog, "default.test_write_position_delete_file_rejects_non_data_reference") + data_file = _append_initial_rows(table) + delete_file = write_position_delete_file(table.io, table.metadata, data_file, [0]) + + with pytest.raises(ValueError, match="referenced_data_file must be a DATA file"): + write_position_delete_file(table.io, table.metadata, delete_file, [0]) + + +def test_write_position_delete_file_falls_back_to_default_spec_id(catalog: Catalog) -> None: + table = _create_v2_table(catalog, "default.test_write_position_delete_file_falls_back_to_default_spec_id") + data_file = _append_initial_rows(table) + fresh_data_file = DataFile.from_args( + _table_format_version=table.metadata.format_version, + content=data_file.content, + file_path=data_file.file_path, + file_format=data_file.file_format, + partition=data_file.partition, + record_count=data_file.record_count, + file_size_in_bytes=data_file.file_size_in_bytes, + column_sizes=data_file.column_sizes, + value_counts=data_file.value_counts, + null_value_counts=data_file.null_value_counts, + nan_value_counts=data_file.nan_value_counts, + lower_bounds=data_file.lower_bounds, + upper_bounds=data_file.upper_bounds, + key_metadata=data_file.key_metadata, + split_offsets=data_file.split_offsets, + equality_ids=data_file.equality_ids, + sort_order_id=data_file.sort_order_id, + ) + + delete_file = write_position_delete_file(table.io, table.metadata, fresh_data_file, [0]) + + assert delete_file.spec_id == table.metadata.default_spec_id + + def test_write_position_delete_file_dedupes_and_sorts_positions(catalog: Catalog) -> None: table = _create_v2_table(catalog, "default.test_write_position_delete_file_dedupes") data_file = _append_initial_rows(table) @@ -183,6 +228,14 @@ def test_write_position_delete_file_accepts_large_position(catalog: Catalog) -> assert rows.column("pos").to_pylist() == [large_position] +def test_append_delete_file_rejects_data_file(catalog: Catalog) -> None: + table = _create_v2_table(catalog, "default.test_append_delete_file_rejects_data_file") + data_file = _append_initial_rows(table) + + with pytest.raises(ValueError, match="append_delete_file requires a delete file"): + _commit_delete_file(table, data_file) + + def test_position_delete_sequence_number_does_not_affect_later_appends(catalog: Catalog) -> None: table = _create_v2_table(catalog, "default.test_position_delete_sequence_number_scoping") data_file = _append_initial_rows(table) diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index f1b7bf8df2..b1e26c62ba 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -987,8 +987,10 @@ def test_positional_delete_schema_pos_is_long() -> None: assert file_path_field.name == "file_path" assert isinstance(file_path_field.field_type, StringType) + assert file_path_field.required is True assert pos_field.name == "pos" assert isinstance(pos_field.field_type, LongType) + assert pos_field.required is True def test_positional_delete_schema_pos_serializes_as_avro_long() -> None: From eeec84eb48ae19961a2fe3c5ce84003680d8f4c7 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 21:44:44 -0500 Subject: [PATCH 3/5] Reject position deletes on v1 tables and write non-null delete columns Guard write_position_delete_file against non-v2 tables (a v1 DataFile has no content field and would be corrupt), write file_path and pos as non-nullable Arrow fields to match the required positional-delete schema, and restrict append_delete_file to position deletes since scan planning cannot yet read equality deletes. --- pyiceberg/io/pyarrow.py | 6 ++- pyiceberg/table/update/snapshot.py | 4 +- tests/table/test_mor_row_delta_producer.py | 43 +++++++++++++++++++++- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 16ce909169..bff4fdc022 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2703,6 +2703,8 @@ def write_position_delete_file( spec_id copied from the referenced data file so it is partition-scoped, and record_count equal to the number of deleted positions. """ + if table_metadata.format_version < 2: + raise ValueError(f"Position deletes are only supported for v2+ tables, got v{table_metadata.format_version}") if referenced_data_file.content != DataFileContent.DATA: raise ValueError(f"referenced_data_file must be a DATA file, got {referenced_data_file.content}") @@ -2728,8 +2730,8 @@ def write_position_delete_file( ) arrow_schema = pa.schema( [ - pa.field("file_path", pa.string(), metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"2147483546"}), - pa.field("pos", pa.int64(), metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"2147483545"}), + pa.field("file_path", pa.string(), nullable=False, metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"2147483546"}), + pa.field("pos", pa.int64(), nullable=False, metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"2147483545"}), ] ) arrow_table = pa.Table.from_arrays( diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 52ee4fb10c..bd01b62ab3 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -151,8 +151,8 @@ def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: return self def append_delete_file(self, delete_file: DataFile) -> _SnapshotProducer[U]: - if delete_file.content not in (DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES): - raise ValueError(f"append_delete_file requires a delete file, got {delete_file.content}") + if delete_file.content != DataFileContent.POSITION_DELETES: + raise ValueError(f"append_delete_file requires a position-delete file, got {delete_file.content}") self._added_delete_files.append(delete_file) return self diff --git a/tests/table/test_mor_row_delta_producer.py b/tests/table/test_mor_row_delta_producer.py index c3a7ce1953..04f431b108 100644 --- a/tests/table/test_mor_row_delta_producer.py +++ b/tests/table/test_mor_row_delta_producer.py @@ -125,8 +125,10 @@ def test_write_position_delete_file(catalog: Catalog) -> None: schema, rows = _read_delete_parquet(table, delete_file) assert schema.field("file_path").metadata == {PYARROW_PARQUET_FIELD_ID_KEY: b"2147483546"} assert schema.field("file_path").type == pa.string() + assert schema.field("file_path").nullable is False assert schema.field("pos").metadata == {PYARROW_PARQUET_FIELD_ID_KEY: b"2147483545"} assert schema.field("pos").type == pa.int64() + assert schema.field("pos").nullable is False assert rows.column("pos").to_pylist() == [0, 2] assert rows.column("file_path").to_pylist() == [data_file.file_path, data_file.file_path] @@ -156,6 +158,45 @@ def test_write_position_delete_file_rejects_non_data_reference(catalog: Catalog) write_position_delete_file(table.io, table.metadata, delete_file, [0]) +def test_write_position_delete_file_rejects_v1_table(catalog: Catalog) -> None: + try: + catalog.create_namespace("default") + except NamespaceAlreadyExistsError: + pass + table = catalog.create_table( + identifier="default.test_write_position_delete_file_rejects_v1_table", + schema=ICEBERG_SCHEMA, + properties={"format-version": "1"}, + ) + table.append(_arrow_table([{"id": 1, "data": "a"}, {"id": 2, "data": "b"}])) + data_file = _current_data_files(table)[0] + + with pytest.raises(ValueError, match="v2"): + write_position_delete_file(table.io, table.metadata, data_file, [0]) + + +def test_append_delete_file_rejects_equality_delete(catalog: Catalog) -> None: + table = _create_v2_table(catalog, "default.test_append_delete_file_rejects_equality_delete") + data_file = _append_initial_rows(table) + eq_delete = DataFile.from_args( + _table_format_version=table.metadata.format_version, + content=DataFileContent.EQUALITY_DELETES, + file_path=data_file.file_path, + file_format=data_file.file_format, + partition=data_file.partition, + record_count=1, + file_size_in_bytes=1, + equality_ids=[1], + sort_order_id=None, + key_metadata=None, + ) + + with pytest.raises(ValueError, match="append_delete_file requires a position-delete file"): + with table.transaction() as tx: + with tx.update_snapshot().row_delta() as row_delta: + row_delta.append_delete_file(eq_delete) + + def test_write_position_delete_file_falls_back_to_default_spec_id(catalog: Catalog) -> None: table = _create_v2_table(catalog, "default.test_write_position_delete_file_falls_back_to_default_spec_id") data_file = _append_initial_rows(table) @@ -232,7 +273,7 @@ def test_append_delete_file_rejects_data_file(catalog: Catalog) -> None: table = _create_v2_table(catalog, "default.test_append_delete_file_rejects_data_file") data_file = _append_initial_rows(table) - with pytest.raises(ValueError, match="append_delete_file requires a delete file"): + with pytest.raises(ValueError, match="append_delete_file requires a position-delete file"): _commit_delete_file(table, data_file) From f5b52d587ffdce8dc29473681dc837f0ed6c47a8 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 22:13:05 -0500 Subject: [PATCH 4/5] Confine append_delete_file to the row-delta producer Move append_delete_file off the shared _SnapshotProducer base so it is no longer inherited by fast_append/merge_append, where committing position deletes would produce an append snapshot that logically deletes rows. --- pyiceberg/table/update/snapshot.py | 12 ++++++------ tests/table/test_mor_row_delta_producer.py | 10 ++++++++++ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index bd01b62ab3..c6e016e2e8 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -150,12 +150,6 @@ def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._added_data_files.append(data_file) return self - def append_delete_file(self, delete_file: DataFile) -> _SnapshotProducer[U]: - if delete_file.content != DataFileContent.POSITION_DELETES: - raise ValueError(f"append_delete_file requires a position-delete file, got {delete_file.content}") - self._added_delete_files.append(delete_file) - return self - def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._deleted_data_files.add(data_file) return self @@ -579,6 +573,12 @@ class _RowDeltaFiles(_SnapshotProducer["_RowDeltaFiles"]): unassigned and stamped at commit, so delete seq == data seq == snapshot seq (a true row delta). """ + def append_delete_file(self, delete_file: DataFile) -> _RowDeltaFiles: + if delete_file.content != DataFileContent.POSITION_DELETES: + raise ValueError(f"append_delete_file requires a position-delete file, got {delete_file.content}") + self._added_delete_files.append(delete_file) + return self + def _existing_manifests(self) -> list[ManifestFile]: """Carry forward all existing manifests from the parent snapshot, like _FastAppendFiles.""" existing_manifests = [] diff --git a/tests/table/test_mor_row_delta_producer.py b/tests/table/test_mor_row_delta_producer.py index 04f431b108..18fb4c9861 100644 --- a/tests/table/test_mor_row_delta_producer.py +++ b/tests/table/test_mor_row_delta_producer.py @@ -277,6 +277,16 @@ def test_append_delete_file_rejects_data_file(catalog: Catalog) -> None: _commit_delete_file(table, data_file) +def test_append_delete_file_not_exposed_on_fast_append(catalog: Catalog) -> None: + # Delete files must only be appendable via the row-delta path, never via fast/merge append, + # which would commit row deletions under append semantics. + table = _create_v2_table(catalog, "default.test_append_delete_file_not_exposed_on_fast_append") + with table.transaction() as tx: + assert not hasattr(tx.update_snapshot().fast_append(), "append_delete_file") + assert not hasattr(tx.update_snapshot().merge_append(), "append_delete_file") + assert hasattr(tx.update_snapshot().row_delta(), "append_delete_file") + + def test_position_delete_sequence_number_does_not_affect_later_appends(catalog: Catalog) -> None: table = _create_v2_table(catalog, "default.test_position_delete_sequence_number_scoping") data_file = _append_initial_rows(table) From f7460e8e5e98607aa5f21670974d31be5cbaef7d Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Wed, 24 Jun 2026 08:27:07 -0500 Subject: [PATCH 5/5] chore: satisfy mypy on MoR row-delta producer tests --- tests/table/test_mor_row_delta_producer.py | 2 ++ tests/utils/test_manifest.py | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/table/test_mor_row_delta_producer.py b/tests/table/test_mor_row_delta_producer.py index 18fb4c9861..e0ebb573ce 100644 --- a/tests/table/test_mor_row_delta_producer.py +++ b/tests/table/test_mor_row_delta_producer.py @@ -251,6 +251,7 @@ def test_row_delta_position_delete_end_to_end(catalog: Catalog) -> None: current_snapshot = table.current_snapshot() assert current_snapshot is not None + assert current_snapshot.summary is not None assert current_snapshot.summary.operation == Operation.OVERWRITE reloaded = catalog.load_table(identifier) @@ -293,6 +294,7 @@ def test_position_delete_sequence_number_does_not_affect_later_appends(catalog: first_snapshot = table.current_snapshot() assert first_snapshot is not None first_sequence_number = first_snapshot.sequence_number + assert first_sequence_number is not None delete_file = write_position_delete_file( table.io, diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index b1e26c62ba..2cebf59daf 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -16,6 +16,7 @@ # under the License. # pylint: disable=redefined-outer-name,arguments-renamed,fixme from tempfile import TemporaryDirectory +from typing import Any, cast import fastavro import pytest @@ -997,7 +998,7 @@ def test_positional_delete_schema_pos_serializes_as_avro_long() -> None: from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA from pyiceberg.utils.schema_conversion import AvroSchemaConversion - avro_schema = AvroSchemaConversion().iceberg_to_avro(POSITIONAL_DELETE_SCHEMA, schema_name="pos_delete") + avro_schema = cast(dict[str, Any], AvroSchemaConversion().iceberg_to_avro(POSITIONAL_DELETE_SCHEMA, schema_name="pos_delete")) pos_field = next(field for field in avro_schema["fields"] if field["name"] == "pos") assert "long" in pos_field["type"]