diff --git a/pyiceberg/serializers.py b/pyiceberg/serializers.py index 726e6b5f62..ba024fd63c 100644 --- a/pyiceberg/serializers.py +++ b/pyiceberg/serializers.py @@ -27,13 +27,30 @@ from pyiceberg.utils.config import Config GZIP = "gzip" +NONE = "none" class Compressor(ABC): @staticmethod def get_compressor(location: str) -> Compressor: + # An unknown codec extension (e.g. ".lz4.metadata.json") falls through to NOOP, + # matching Java's TableMetadataParser.Codec.fromFileName, which treats any + # non-".gz" infix as NONE. return GzipCompressor() if location.endswith(".gz.metadata.json") else NOOP_COMPRESSOR + @staticmethod + def from_codec_name(codec_name: str | None) -> Compressor: + if codec_name is None: + return NOOP_COMPRESSOR + + normalized_codec_name = codec_name.lower() + if normalized_codec_name == NONE: + return NOOP_COMPRESSOR + elif normalized_codec_name == GZIP: + return GzipCompressor() + else: + raise ValueError(f"Unsupported metadata compression codec: {codec_name}") + @abstractmethod def stream_decompressor(self, inp: InputStream) -> InputStream: """Return a stream decompressor. @@ -73,6 +90,16 @@ def bytes_compressor(self) -> Callable[[bytes], bytes]: return gzip.compress +def metadata_file_extension(codec_name: str | None) -> str: + Compressor.from_codec_name(codec_name) + + normalized_codec_name = NONE if codec_name is None else codec_name.lower() + return { + NONE: ".metadata.json", + GZIP: ".gz.metadata.json", + }[normalized_codec_name] + + class FromByteStream: """A collection of methods that deserialize dictionaries into Iceberg objects.""" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 597f62632f..f027ba8086 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -174,6 +174,8 @@ class TableProperties: WRITE_FILE_FORMAT = "write.format.default" WRITE_FILE_FORMAT_DEFAULT = "parquet" WRITE_METADATA_PATH = "write.metadata.path" + WRITE_METADATA_COMPRESSION = "write.metadata.compression-codec" + WRITE_METADATA_COMPRESSION_DEFAULT = "none" DELETE_MODE = "write.delete.mode" DELETE_MODE_COPY_ON_WRITE = "copy-on-write" diff --git a/pyiceberg/table/locations.py b/pyiceberg/table/locations.py index 771c6b5a0f..f7616ccb5b 100644 --- a/pyiceberg/table/locations.py +++ b/pyiceberg/table/locations.py @@ -85,7 +85,14 @@ def new_table_metadata_file_location(self, new_version: int = 0) -> str: if new_version < 0: raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer") - file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json" + from pyiceberg.serializers import metadata_file_extension + from pyiceberg.table import TableProperties + + codec = self.table_properties.get( + TableProperties.WRITE_METADATA_COMPRESSION, TableProperties.WRITE_METADATA_COMPRESSION_DEFAULT + ) + suffix = metadata_file_extension(codec) + file_name = f"{new_version:05d}-{uuid.uuid4()}{suffix}" return self.new_metadata_location(file_name) def new_metadata_location(self, metadata_file_name: str) -> str: diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 53ce6fcd42..627295ba54 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -18,14 +18,23 @@ import json import os import uuid +from pathlib import Path from typing import Any import pytest from pytest_mock import MockFixture -from pyiceberg.serializers import ToOutputFile -from pyiceberg.table import StaticTable -from pyiceberg.table.metadata import TableMetadataV1 +from pyiceberg.serializers import ( + Compressor, + FromInputFile, + GzipCompressor, + NoopCompressor, + ToOutputFile, + metadata_file_extension, +) +from pyiceberg.table import StaticTable, TableProperties +from pyiceberg.table.locations import load_location_provider +from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2 from pyiceberg.table.update import AssertRefSnapshotId, TableRequirement from pyiceberg.typedef import IcebergBaseModel @@ -60,3 +69,88 @@ class ExampleRequest(IcebergBaseModel): dumped_json = request.model_dump_json() expected_json = """{"type":"assert-ref-snapshot-id","ref":"main","snapshot-id":null}""" assert expected_json in dumped_json + + +@pytest.mark.parametrize( + ("location", "compressor_type"), + [ + ("s3://bucket/table/metadata/00000-table.gz.metadata.json", GzipCompressor), + ("s3://bucket/table/metadata/00000-table.metadata.json", NoopCompressor), + ("s3://bucket/table/metadata/00000-table.lz4.metadata.json", NoopCompressor), + ], +) +def test_get_compressor_detects_metadata_compression(location: str, compressor_type: type[Compressor]) -> None: + assert isinstance(Compressor.get_compressor(location), compressor_type) + + +@pytest.mark.parametrize( + ("codec_name", "compressor_type"), + [ + (None, NoopCompressor), + ("none", NoopCompressor), + ("gzip", GzipCompressor), + ("GZIP", GzipCompressor), + ], +) +def test_from_codec_name(codec_name: str | None, compressor_type: type[Compressor]) -> None: + assert isinstance(Compressor.from_codec_name(codec_name), compressor_type) + + +@pytest.mark.parametrize("codec_name", ["zstd", "lz4", "snappy", "bogus"]) +def test_from_codec_name_raises_for_unknown_codec(codec_name: str) -> None: + with pytest.raises(ValueError, match=f"Unsupported metadata compression codec: {codec_name}"): + Compressor.from_codec_name(codec_name) + + +@pytest.mark.parametrize("codec_name", ["none", "gzip"]) +def test_table_metadata_round_trip_with_compression( + tmp_path: Path, example_table_metadata_v2: dict[str, Any], codec_name: str +) -> None: + from pyiceberg.io.pyarrow import PyArrowFileIO + + metadata = TableMetadataV2(**example_table_metadata_v2) + file_io = PyArrowFileIO() + metadata_location = str(tmp_path / f"{uuid.uuid4()}{metadata_file_extension(codec_name)}") + + ToOutputFile.table_metadata(metadata, file_io.new_output(location=metadata_location), overwrite=True) + + raw_bytes = Path(metadata_location).read_bytes() + if codec_name == "gzip": + # Java writes gzip-compressed metadata when this codec is set; assert we match by + # checking the gzip magic bytes rather than relying on the (lossless) round trip. + assert raw_bytes[:2] == b"\x1f\x8b" + else: + assert raw_bytes.lstrip().startswith(b"{") + + parsed_metadata = FromInputFile.table_metadata(file_io.new_input(location=metadata_location)) + assert parsed_metadata == metadata + assert parsed_metadata.table_uuid == metadata.table_uuid + assert parsed_metadata.schema() == metadata.schema() + + +@pytest.mark.parametrize( + ("table_properties", "expected_suffix"), + [ + ({}, ".metadata.json"), + ({TableProperties.WRITE_METADATA_COMPRESSION: "none"}, ".metadata.json"), + ({TableProperties.WRITE_METADATA_COMPRESSION: "gzip"}, ".gz.metadata.json"), + ], +) +def test_new_table_metadata_file_location_uses_metadata_compression( + table_properties: dict[str, str], expected_suffix: str +) -> None: + provider = load_location_provider(table_location="table_location", table_properties=table_properties) + + metadata_location = provider.new_table_metadata_file_location(new_version=3) + + assert metadata_location.startswith("table_location/metadata/00003-") + assert metadata_location.endswith(expected_suffix) + + +def test_new_table_metadata_file_location_raises_for_unknown_compression() -> None: + provider = load_location_provider( + table_location="table_location", table_properties={TableProperties.WRITE_METADATA_COMPRESSION: "bogus"} + ) + + with pytest.raises(ValueError, match="Unsupported metadata compression codec: bogus"): + provider.new_table_metadata_file_location(new_version=3)