diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 3811a9d894..434ffae623 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -774,9 +774,7 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()} -POSITIONAL_DELETE_SCHEMA = Schema( - NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", IntegerType()) -) +POSITIONAL_DELETE_SCHEMA = Schema(NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", LongType())) class ManifestFile(Record): diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 40ad4bf221..462bf34648 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -16,6 +16,7 @@ # under the License. # pylint: disable=redefined-outer-name,arguments-renamed,fixme from tempfile import TemporaryDirectory +from typing import Any, cast import fastavro import pytest @@ -973,3 +974,101 @@ def test_inherit_from_manifest_snapshot_id() -> None: assert result.snapshot_id == 3051729675574597004 assert result.sequence_number == 1 assert result.file_sequence_number == 1 + + +def test_positional_delete_schema_pos_is_long() -> None: + # Spec: the positional delete `pos` field (id 2147483545) is a long, not an int32. + # A v2 position-delete file can reference row positions in data files with more than + # 2**31 rows, so an int32 `pos` would silently overflow. + from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA + from pyiceberg.types import LongType, StringType + + file_path_field = POSITIONAL_DELETE_SCHEMA.find_field(2147483546) + pos_field = POSITIONAL_DELETE_SCHEMA.find_field(2147483545) + + assert file_path_field.name == "file_path" + assert isinstance(file_path_field.field_type, StringType) + assert pos_field.name == "pos" + assert isinstance(pos_field.field_type, LongType) + + +def test_positional_delete_schema_pos_serializes_as_avro_long() -> None: + from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA + from pyiceberg.utils.schema_conversion import AvroSchemaConversion + + avro_schema = cast(dict[str, Any], AvroSchemaConversion().iceberg_to_avro(POSITIONAL_DELETE_SCHEMA, schema_name="pos_delete")) + pos_field = next(field for field in avro_schema["fields"] if field["name"] == "pos") + + assert "long" in pos_field["type"] + assert "int" not in pos_field["type"] + + +def test_positional_delete_pos_above_int32_round_trips() -> None: + # A position beyond int32 range must survive conversion to/from the bound bytes + # that get stored in a position-delete DataFile's lower/upper bounds. + from pyiceberg.conversions import from_bytes, to_bytes + from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA + + pos_type = POSITIONAL_DELETE_SCHEMA.find_field(2147483545).field_type + large_pos = 2**31 + 123456789 # well past int32 max (2147483647) + + encoded = to_bytes(pos_type, large_pos) + # long encodes to 8 bytes; an int32 would have truncated to 4 bytes / wrapped. + assert len(encoded) == 8 + assert from_bytes(pos_type, encoded) == large_pos + + +def test_position_delete_datafile_large_pos_bounds_round_trip_through_manifest( + test_schema: Schema, + test_partition_spec: PartitionSpec, +) -> None: + # A DataFile carrying POSITION_DELETES content with pos bounds beyond int32 + # round-trips through a written/read v2 manifest's map bounds. + from pyiceberg.conversions import from_bytes, to_bytes + from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA + from pyiceberg.types import LongType, StringType + + io = load_file_io() + pos_type = POSITIONAL_DELETE_SCHEMA.find_field(2147483545).field_type + min_pos = 2**31 + 1 + max_pos = 2**32 + 99 + path_bytes = to_bytes(StringType(), "s3://bucket/data/big.parquet") + + delete_file = DataFile.from_args( + _table_format_version=2, + content=DataFileContent.POSITION_DELETES, + file_path="s3://bucket/data/big-deletes.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=2, + file_size_in_bytes=128, + lower_bounds={2147483546: path_bytes, 2147483545: to_bytes(LongType(), min_pos)}, + upper_bounds={2147483546: path_bytes, 2147483545: to_bytes(LongType(), max_pos)}, + equality_ids=None, + sort_order_id=None, + ) + + with TemporaryDirectory() as tmpdir: + output = io.new_output(tmpdir + "/pos_delete_manifest.avro") + with write_manifest( + format_version=2, + spec=UNPARTITIONED_PARTITION_SPEC, + schema=test_schema, + output_file=output, + snapshot_id=1, + avro_compression="null", + ) as writer: + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=1, + data_file=delete_file, + ) + ) + new_manifest = writer.to_manifest_file() + + entries = new_manifest.fetch_manifest_entry(io) + read_file = entries[0].data_file + assert read_file.content == DataFileContent.POSITION_DELETES + assert from_bytes(pos_type, read_file.lower_bounds[2147483545]) == min_pos + assert from_bytes(pos_type, read_file.upper_bounds[2147483545]) == max_pos