From 7277b0fa41555c0332a69862d20e35af4d6bf26c Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 17:58:38 -0500 Subject: [PATCH 1/3] Fix POSITIONAL_DELETE_SCHEMA pos field to LongType The position-delete `pos` field (id 2147483545) is specified as a long, but pyiceberg typed it as IntegerType (int32). Data files with more than 2**31 rows would overflow the position on write/read. Correct it to LongType per the Iceberg spec and add round-trip coverage for positions above the int32 range. --- pyiceberg/manifest.py | 4 +- tests/utils/test_manifest.py | 87 ++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 3 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 3811a9d894..434ffae623 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -774,9 +774,7 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()} -POSITIONAL_DELETE_SCHEMA = Schema( - NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", IntegerType()) -) +POSITIONAL_DELETE_SCHEMA = Schema(NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", LongType())) class ManifestFile(Record): diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 40ad4bf221..2df3bd863b 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -973,3 +973,90 @@ def test_inherit_from_manifest_snapshot_id() -> None: assert result.snapshot_id == 3051729675574597004 assert result.sequence_number == 1 assert result.file_sequence_number == 1 + + +def test_positional_delete_schema_pos_is_long() -> None: + # Spec: the positional delete `pos` field (id 2147483545) is a long, not an int32. + # A v2 position-delete file can reference row positions in data files with more than + # 2**31 rows, so an int32 `pos` would silently overflow. + from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA + from pyiceberg.types import LongType, StringType + + file_path_field = POSITIONAL_DELETE_SCHEMA.find_field(2147483546) + pos_field = POSITIONAL_DELETE_SCHEMA.find_field(2147483545) + + assert file_path_field.name == "file_path" + assert isinstance(file_path_field.field_type, StringType) + assert pos_field.name == "pos" + assert isinstance(pos_field.field_type, LongType) + + +def test_positional_delete_pos_above_int32_round_trips() -> None: + # A position beyond int32 range must survive conversion to/from the bound bytes + # that get stored in a position-delete DataFile's lower/upper bounds. + from pyiceberg.conversions import from_bytes, to_bytes + from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA + + pos_type = POSITIONAL_DELETE_SCHEMA.find_field(2147483545).field_type + large_pos = 2**31 + 123456789 # well past int32 max (2147483647) + + encoded = to_bytes(pos_type, large_pos) + # long encodes to 8 bytes; an int32 would have truncated to 4 bytes / wrapped. + assert len(encoded) == 8 + assert from_bytes(pos_type, encoded) == large_pos + + +def test_positional_delete_datafile_large_pos_round_trips_through_manifest( + test_schema: Schema, + test_partition_spec: PartitionSpec, +) -> None: + # End-to-end: a POSITION_DELETES DataFile whose pos bounds exceed int32 must + # round-trip through a written/read v2 manifest without overflow. + from pyiceberg.conversions import from_bytes, to_bytes + from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA + from pyiceberg.types import LongType, StringType + + io = load_file_io() + pos_type = POSITIONAL_DELETE_SCHEMA.find_field(2147483545).field_type + min_pos = 2**31 + 1 + max_pos = 2**32 + 99 + path_bytes = to_bytes(StringType(), "s3://bucket/data/big.parquet") + + delete_file = DataFile.from_args( + _table_format_version=2, + content=DataFileContent.POSITION_DELETES, + file_path="s3://bucket/data/big-deletes.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=2, + file_size_in_bytes=128, + lower_bounds={2147483546: path_bytes, 2147483545: to_bytes(LongType(), min_pos)}, + upper_bounds={2147483546: path_bytes, 2147483545: to_bytes(LongType(), max_pos)}, + equality_ids=None, + sort_order_id=None, + ) + + with TemporaryDirectory() as tmpdir: + output = io.new_output(tmpdir + "/pos_delete_manifest.avro") + with write_manifest( + format_version=2, + spec=UNPARTITIONED_PARTITION_SPEC, + schema=test_schema, + output_file=output, + snapshot_id=1, + avro_compression="null", + ) as writer: + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=1, + data_file=delete_file, + ) + ) + new_manifest = writer.to_manifest_file() + + entries = new_manifest.fetch_manifest_entry(io) + read_file = entries[0].data_file + assert read_file.content == DataFileContent.POSITION_DELETES + assert from_bytes(pos_type, read_file.lower_bounds[2147483545]) == min_pos + assert from_bytes(pos_type, read_file.upper_bounds[2147483545]) == max_pos From a0b31cc7f8233bfc0eaf9372d204126ee3123aa6 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 20:17:39 -0500 Subject: [PATCH 2/3] Strengthen positional delete pos tests with avro long assertion --- tests/utils/test_manifest.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 2df3bd863b..f1b7bf8df2 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -991,6 +991,17 @@ def test_positional_delete_schema_pos_is_long() -> None: assert isinstance(pos_field.field_type, LongType) +def test_positional_delete_schema_pos_serializes_as_avro_long() -> None: + from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA + from pyiceberg.utils.schema_conversion import AvroSchemaConversion + + avro_schema = AvroSchemaConversion().iceberg_to_avro(POSITIONAL_DELETE_SCHEMA, schema_name="pos_delete") + pos_field = next(field for field in avro_schema["fields"] if field["name"] == "pos") + + assert "long" in pos_field["type"] + assert "int" not in pos_field["type"] + + def test_positional_delete_pos_above_int32_round_trips() -> None: # A position beyond int32 range must survive conversion to/from the bound bytes # that get stored in a position-delete DataFile's lower/upper bounds. @@ -1006,12 +1017,12 @@ def test_positional_delete_pos_above_int32_round_trips() -> None: assert from_bytes(pos_type, encoded) == large_pos -def test_positional_delete_datafile_large_pos_round_trips_through_manifest( +def test_position_delete_datafile_large_pos_bounds_round_trip_through_manifest( test_schema: Schema, test_partition_spec: PartitionSpec, ) -> None: - # End-to-end: a POSITION_DELETES DataFile whose pos bounds exceed int32 must - # round-trip through a written/read v2 manifest without overflow. + # A DataFile carrying POSITION_DELETES content with pos bounds beyond int32 + # round-trips through a written/read v2 manifest's map bounds. from pyiceberg.conversions import from_bytes, to_bytes from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA from pyiceberg.types import LongType, StringType From e859fb450ab94586d75663947af03f7d03e96978 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Wed, 24 Jun 2026 08:26:30 -0500 Subject: [PATCH 3/3] chore: type the avro schema dict in positional-delete test --- tests/utils/test_manifest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index f1b7bf8df2..462bf34648 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -16,6 +16,7 @@ # under the License. # pylint: disable=redefined-outer-name,arguments-renamed,fixme from tempfile import TemporaryDirectory +from typing import Any, cast import fastavro import pytest @@ -995,7 +996,7 @@ def test_positional_delete_schema_pos_serializes_as_avro_long() -> None: from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA from pyiceberg.utils.schema_conversion import AvroSchemaConversion - avro_schema = AvroSchemaConversion().iceberg_to_avro(POSITIONAL_DELETE_SCHEMA, schema_name="pos_delete") + avro_schema = cast(dict[str, Any], AvroSchemaConversion().iceberg_to_avro(POSITIONAL_DELETE_SCHEMA, schema_name="pos_delete")) pos_field = next(field for field in avro_schema["fields"] if field["name"] == "pos") assert "long" in pos_field["type"]