Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,9 +774,7 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition
MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()}


POSITIONAL_DELETE_SCHEMA = Schema(
NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", IntegerType())
)
POSITIONAL_DELETE_SCHEMA = Schema(NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", LongType()))


class ManifestFile(Record):
Expand Down
99 changes: 99 additions & 0 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
from tempfile import TemporaryDirectory
from typing import Any, cast

import fastavro
import pytest
Expand Down Expand Up @@ -973,3 +974,101 @@ def test_inherit_from_manifest_snapshot_id() -> None:
assert result.snapshot_id == 3051729675574597004
assert result.sequence_number == 1
assert result.file_sequence_number == 1


def test_positional_delete_schema_pos_is_long() -> None:
# Spec: the positional delete `pos` field (id 2147483545) is a long, not an int32.
# A v2 position-delete file can reference row positions in data files with more than
# 2**31 rows, so an int32 `pos` would silently overflow.
from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA
from pyiceberg.types import LongType, StringType

file_path_field = POSITIONAL_DELETE_SCHEMA.find_field(2147483546)
pos_field = POSITIONAL_DELETE_SCHEMA.find_field(2147483545)

assert file_path_field.name == "file_path"
assert isinstance(file_path_field.field_type, StringType)
assert pos_field.name == "pos"
assert isinstance(pos_field.field_type, LongType)


def test_positional_delete_schema_pos_serializes_as_avro_long() -> None:
from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA
from pyiceberg.utils.schema_conversion import AvroSchemaConversion

avro_schema = cast(dict[str, Any], AvroSchemaConversion().iceberg_to_avro(POSITIONAL_DELETE_SCHEMA, schema_name="pos_delete"))
pos_field = next(field for field in avro_schema["fields"] if field["name"] == "pos")

assert "long" in pos_field["type"]
assert "int" not in pos_field["type"]


def test_positional_delete_pos_above_int32_round_trips() -> None:
# A position beyond int32 range must survive conversion to/from the bound bytes
# that get stored in a position-delete DataFile's lower/upper bounds.
from pyiceberg.conversions import from_bytes, to_bytes
from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA

pos_type = POSITIONAL_DELETE_SCHEMA.find_field(2147483545).field_type
large_pos = 2**31 + 123456789 # well past int32 max (2147483647)

encoded = to_bytes(pos_type, large_pos)
# long encodes to 8 bytes; an int32 would have truncated to 4 bytes / wrapped.
assert len(encoded) == 8
assert from_bytes(pos_type, encoded) == large_pos


def test_position_delete_datafile_large_pos_bounds_round_trip_through_manifest(
test_schema: Schema,
test_partition_spec: PartitionSpec,
) -> None:
# A DataFile carrying POSITION_DELETES content with pos bounds beyond int32
# round-trips through a written/read v2 manifest's map<int, bytes> bounds.
from pyiceberg.conversions import from_bytes, to_bytes
from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA
from pyiceberg.types import LongType, StringType

io = load_file_io()
pos_type = POSITIONAL_DELETE_SCHEMA.find_field(2147483545).field_type
min_pos = 2**31 + 1
max_pos = 2**32 + 99
path_bytes = to_bytes(StringType(), "s3://bucket/data/big.parquet")

delete_file = DataFile.from_args(
_table_format_version=2,
content=DataFileContent.POSITION_DELETES,
file_path="s3://bucket/data/big-deletes.parquet",
file_format=FileFormat.PARQUET,
partition=Record(),
record_count=2,
file_size_in_bytes=128,
lower_bounds={2147483546: path_bytes, 2147483545: to_bytes(LongType(), min_pos)},
upper_bounds={2147483546: path_bytes, 2147483545: to_bytes(LongType(), max_pos)},
equality_ids=None,
sort_order_id=None,
)

with TemporaryDirectory() as tmpdir:
output = io.new_output(tmpdir + "/pos_delete_manifest.avro")
with write_manifest(
format_version=2,
spec=UNPARTITIONED_PARTITION_SPEC,
schema=test_schema,
output_file=output,
snapshot_id=1,
avro_compression="null",
) as writer:
writer.add_entry(
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=1,
data_file=delete_file,
)
)
new_manifest = writer.to_manifest_file()

entries = new_manifest.fetch_manifest_entry(io)
read_file = entries[0].data_file
assert read_file.content == DataFileContent.POSITION_DELETES
assert from_bytes(pos_type, read_file.lower_bounds[2147483545]) == min_pos
assert from_bytes(pos_type, read_file.upper_bounds[2147483545]) == max_pos