From d434e80a1520209a59400d812badbc395fd1c0cf Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 10:16:15 -0500 Subject: [PATCH 1/2] Add metadata-file compression (gzip write parity + experimental zstd) PyIceberg could read .gz.metadata.json but never wrote compressed metadata: new_table_metadata_file_location always emitted .metadata.json and the standard `write.metadata.compression-codec` property was read nowhere in production code. This adds full read/write parity: - TableProperties.WRITE_METADATA_COMPRESSION (default "none", matching Java). - serializers: Compressor.from_codec_name() validates codec names and raises ValueError on unknown codecs (mirrors Java's Codec.fromName); metadata_file_extension() builds the filename suffix. - new_table_metadata_file_location honors the property and fails loudly on an unknown codec. - Read path now also handles legacy Java ".metadata.json.gz" files and experimental ".zst.metadata.json". - zstd is NON-STANDARD/experimental (Spark/Java cannot read it) and is clearly marked as such. - Version-parsing regex tolerates the codec infix and legacy suffix. Tests: codec detection, loud rejection of unknown codecs, and real on-disk round trips (gzip/zstd/none) via PyArrowFileIO. --- pyiceberg/catalog/__init__.py | 7 ++- pyiceberg/serializers.py | 50 +++++++++++++++++- pyiceberg/table/__init__.py | 2 + pyiceberg/table/locations.py | 9 +++- tests/test_serializers.py | 96 +++++++++++++++++++++++++++++++++-- 5 files changed, 157 insertions(+), 7 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 95ceaa539f..d1fd5f53a9 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -102,8 +102,11 @@ (\d+) # version number - # separator ([\w-]{36}) # UUID (36 characters, including hyphens) - (?:\.\w+)? # optional codec name - \.metadata\.json # file extension + (?: + (?:\.\w+)? # optional codec name + \.metadata\.json + | \.metadata\.json\.gz + ) # file extension """, re.X, ) diff --git a/pyiceberg/serializers.py b/pyiceberg/serializers.py index 726e6b5f62..6dc088b7b6 100644 --- a/pyiceberg/serializers.py +++ b/pyiceberg/serializers.py @@ -21,18 +21,46 @@ from abc import ABC, abstractmethod from collections.abc import Callable +import zstandard as zstd + from pyiceberg.io import InputFile, InputStream, OutputFile from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil from pyiceberg.typedef import UTF8 from pyiceberg.utils.config import Config GZIP = "gzip" +NONE = "none" +ZSTD = "zstd" class Compressor(ABC): @staticmethod def get_compressor(location: str) -> Compressor: - return GzipCompressor() if location.endswith(".gz.metadata.json") else NOOP_COMPRESSOR + if location.endswith((".gz.metadata.json", ".metadata.json.gz")): + return GzipCompressor() + elif location.endswith(".zst.metadata.json"): + return ZstdCompressor() + else: + # Plain ".metadata.json" or any non-metadata file: no decompression. + # NB: an unknown codec extension (e.g. ".lz4.metadata.json") falls through + # to NOOP here, matching Java's TableMetadataParser.Codec.fromFileName, + # which treats any non-".gz" infix as NONE. + return NOOP_COMPRESSOR + + @staticmethod + def from_codec_name(codec_name: str | None) -> Compressor: + if codec_name is None: + return NOOP_COMPRESSOR + + normalized_codec_name = codec_name.lower() + if normalized_codec_name == NONE: + return NOOP_COMPRESSOR + elif normalized_codec_name == GZIP: + return GzipCompressor() + elif normalized_codec_name == ZSTD: + return ZstdCompressor() + else: + raise ValueError(f"Unsupported metadata compression codec: {codec_name}") @abstractmethod def stream_decompressor(self, inp: InputStream) -> InputStream: @@ -73,6 +101,26 @@ def bytes_compressor(self) -> Callable[[bytes], bytes]: return gzip.compress +# Zstd metadata compression is experimental and non-standard; Spark/Java readers cannot read it. +class ZstdCompressor(Compressor): + def stream_decompressor(self, inp: InputStream) -> InputStream: + return zstd.ZstdDecompressor().stream_reader(inp) + + def bytes_compressor(self) -> Callable[[bytes], bytes]: + return lambda b: zstd.ZstdCompressor().compress(b) + + +def metadata_file_extension(codec_name: str | None) -> str: + Compressor.from_codec_name(codec_name) + + normalized_codec_name = NONE if codec_name is None else codec_name.lower() + return { + NONE: ".metadata.json", + GZIP: ".gz.metadata.json", + ZSTD: ".zst.metadata.json", + }[normalized_codec_name] + + class FromByteStream: """A collection of methods that deserialize dictionaries into Iceberg objects.""" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 597f62632f..f027ba8086 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -174,6 +174,8 @@ class TableProperties: WRITE_FILE_FORMAT = "write.format.default" WRITE_FILE_FORMAT_DEFAULT = "parquet" WRITE_METADATA_PATH = "write.metadata.path" + WRITE_METADATA_COMPRESSION = "write.metadata.compression-codec" + WRITE_METADATA_COMPRESSION_DEFAULT = "none" DELETE_MODE = "write.delete.mode" DELETE_MODE_COPY_ON_WRITE = "copy-on-write" diff --git a/pyiceberg/table/locations.py b/pyiceberg/table/locations.py index 771c6b5a0f..f7616ccb5b 100644 --- a/pyiceberg/table/locations.py +++ b/pyiceberg/table/locations.py @@ -85,7 +85,14 @@ def new_table_metadata_file_location(self, new_version: int = 0) -> str: if new_version < 0: raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer") - file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json" + from pyiceberg.serializers import metadata_file_extension + from pyiceberg.table import TableProperties + + codec = self.table_properties.get( + TableProperties.WRITE_METADATA_COMPRESSION, TableProperties.WRITE_METADATA_COMPRESSION_DEFAULT + ) + suffix = metadata_file_extension(codec) + file_name = f"{new_version:05d}-{uuid.uuid4()}{suffix}" return self.new_metadata_location(file_name) def new_metadata_location(self, metadata_file_name: str) -> str: diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 53ce6fcd42..9594cdaa58 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -18,14 +18,24 @@ import json import os import uuid +from pathlib import Path from typing import Any import pytest from pytest_mock import MockFixture -from pyiceberg.serializers import ToOutputFile -from pyiceberg.table import StaticTable -from pyiceberg.table.metadata import TableMetadataV1 +from pyiceberg.serializers import ( + Compressor, + FromInputFile, + GzipCompressor, + NoopCompressor, + ToOutputFile, + ZstdCompressor, + metadata_file_extension, +) +from pyiceberg.table import StaticTable, TableProperties +from pyiceberg.table.locations import load_location_provider +from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2 from pyiceberg.table.update import AssertRefSnapshotId, TableRequirement from pyiceberg.typedef import IcebergBaseModel @@ -60,3 +70,83 @@ class ExampleRequest(IcebergBaseModel): dumped_json = request.model_dump_json() expected_json = """{"type":"assert-ref-snapshot-id","ref":"main","snapshot-id":null}""" assert expected_json in dumped_json + + +@pytest.mark.parametrize( + ("location", "compressor_type"), + [ + ("s3://bucket/table/metadata/00000-table.gz.metadata.json", GzipCompressor), + ("s3://bucket/table/metadata/00000-table.metadata.json.gz", GzipCompressor), + ("s3://bucket/table/metadata/00000-table.zst.metadata.json", ZstdCompressor), + ("s3://bucket/table/metadata/00000-table.metadata.json", NoopCompressor), + ], +) +def test_get_compressor_detects_metadata_compression(location: str, compressor_type: type[Compressor]) -> None: + assert isinstance(Compressor.get_compressor(location), compressor_type) + + +@pytest.mark.parametrize( + ("codec_name", "compressor_type"), + [ + (None, NoopCompressor), + ("none", NoopCompressor), + ("gzip", GzipCompressor), + ("GZIP", GzipCompressor), + ("zstd", ZstdCompressor), + ], +) +def test_from_codec_name(codec_name: str | None, compressor_type: type[Compressor]) -> None: + assert isinstance(Compressor.from_codec_name(codec_name), compressor_type) + + +@pytest.mark.parametrize("codec_name", ["lz4", "snappy", "bogus"]) +def test_from_codec_name_raises_for_unknown_codec(codec_name: str) -> None: + with pytest.raises(ValueError, match=f"Unsupported metadata compression codec: {codec_name}"): + Compressor.from_codec_name(codec_name) + + +@pytest.mark.parametrize("codec_name", ["none", "gzip", "zstd"]) +def test_table_metadata_round_trip_with_compression( + tmp_path: Path, example_table_metadata_v2: dict[str, Any], codec_name: str +) -> None: + from pyiceberg.io.pyarrow import PyArrowFileIO + + metadata = TableMetadataV2(**example_table_metadata_v2) + file_io = PyArrowFileIO() + metadata_location = str(tmp_path / f"{uuid.uuid4()}{metadata_file_extension(codec_name)}") + + ToOutputFile.table_metadata(metadata, file_io.new_output(location=metadata_location), overwrite=True) + + parsed_metadata = FromInputFile.table_metadata(file_io.new_input(location=metadata_location)) + assert parsed_metadata == metadata + assert parsed_metadata.table_uuid == metadata.table_uuid + assert parsed_metadata.schema() == metadata.schema() + + +@pytest.mark.parametrize( + ("table_properties", "expected_suffix"), + [ + ({}, ".metadata.json"), + ({TableProperties.WRITE_METADATA_COMPRESSION: "none"}, ".metadata.json"), + ({TableProperties.WRITE_METADATA_COMPRESSION: "gzip"}, ".gz.metadata.json"), + ({TableProperties.WRITE_METADATA_COMPRESSION: "zstd"}, ".zst.metadata.json"), + ], +) +def test_new_table_metadata_file_location_uses_metadata_compression( + table_properties: dict[str, str], expected_suffix: str +) -> None: + provider = load_location_provider(table_location="table_location", table_properties=table_properties) + + metadata_location = provider.new_table_metadata_file_location(new_version=3) + + assert metadata_location.startswith("table_location/metadata/00003-") + assert metadata_location.endswith(expected_suffix) + + +def test_new_table_metadata_file_location_raises_for_unknown_compression() -> None: + provider = load_location_provider( + table_location="table_location", table_properties={TableProperties.WRITE_METADATA_COMPRESSION: "bogus"} + ) + + with pytest.raises(ValueError, match="Unsupported metadata compression codec: bogus"): + provider.new_table_metadata_file_location(new_version=3) From ea19ee253bade115d3b80b13c54ccf5cb9575974 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 23 Jun 2026 20:24:34 -0500 Subject: [PATCH 2/2] Drop experimental zstd and legacy .metadata.json.gz handling Keep only gzip metadata-compression write parity. zstd produces metadata that Java/Spark cannot read (their codec set is NONE/GZIP only), so it must not be selectable via the standard write.metadata.compression-codec property. Also revert the .metadata.json.gz read/regex additions, which were unnecessary for write parity and left StaticTable.from_metadata mishandling that name. Strengthen the round-trip test to assert gzip magic bytes are actually written. --- pyiceberg/catalog/__init__.py | 7 ++----- pyiceberg/serializers.py | 29 ++++------------------------- tests/test_serializers.py | 18 +++++++++++------- 3 files changed, 17 insertions(+), 37 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index d1fd5f53a9..95ceaa539f 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -102,11 +102,8 @@ (\d+) # version number - # separator ([\w-]{36}) # UUID (36 characters, including hyphens) - (?: - (?:\.\w+)? # optional codec name - \.metadata\.json - | \.metadata\.json\.gz - ) # file extension + (?:\.\w+)? # optional codec name + \.metadata\.json # file extension """, re.X, ) diff --git a/pyiceberg/serializers.py b/pyiceberg/serializers.py index 6dc088b7b6..ba024fd63c 100644 --- a/pyiceberg/serializers.py +++ b/pyiceberg/serializers.py @@ -21,8 +21,6 @@ from abc import ABC, abstractmethod from collections.abc import Callable -import zstandard as zstd - from pyiceberg.io import InputFile, InputStream, OutputFile from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil from pyiceberg.typedef import UTF8 @@ -30,22 +28,15 @@ GZIP = "gzip" NONE = "none" -ZSTD = "zstd" class Compressor(ABC): @staticmethod def get_compressor(location: str) -> Compressor: - if location.endswith((".gz.metadata.json", ".metadata.json.gz")): - return GzipCompressor() - elif location.endswith(".zst.metadata.json"): - return ZstdCompressor() - else: - # Plain ".metadata.json" or any non-metadata file: no decompression. - # NB: an unknown codec extension (e.g. ".lz4.metadata.json") falls through - # to NOOP here, matching Java's TableMetadataParser.Codec.fromFileName, - # which treats any non-".gz" infix as NONE. - return NOOP_COMPRESSOR + # An unknown codec extension (e.g. ".lz4.metadata.json") falls through to NOOP, + # matching Java's TableMetadataParser.Codec.fromFileName, which treats any + # non-".gz" infix as NONE. + return GzipCompressor() if location.endswith(".gz.metadata.json") else NOOP_COMPRESSOR @staticmethod def from_codec_name(codec_name: str | None) -> Compressor: @@ -57,8 +48,6 @@ def from_codec_name(codec_name: str | None) -> Compressor: return NOOP_COMPRESSOR elif normalized_codec_name == GZIP: return GzipCompressor() - elif normalized_codec_name == ZSTD: - return ZstdCompressor() else: raise ValueError(f"Unsupported metadata compression codec: {codec_name}") @@ -101,15 +90,6 @@ def bytes_compressor(self) -> Callable[[bytes], bytes]: return gzip.compress -# Zstd metadata compression is experimental and non-standard; Spark/Java readers cannot read it. -class ZstdCompressor(Compressor): - def stream_decompressor(self, inp: InputStream) -> InputStream: - return zstd.ZstdDecompressor().stream_reader(inp) - - def bytes_compressor(self) -> Callable[[bytes], bytes]: - return lambda b: zstd.ZstdCompressor().compress(b) - - def metadata_file_extension(codec_name: str | None) -> str: Compressor.from_codec_name(codec_name) @@ -117,7 +97,6 @@ def metadata_file_extension(codec_name: str | None) -> str: return { NONE: ".metadata.json", GZIP: ".gz.metadata.json", - ZSTD: ".zst.metadata.json", }[normalized_codec_name] diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 9594cdaa58..627295ba54 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -30,7 +30,6 @@ GzipCompressor, NoopCompressor, ToOutputFile, - ZstdCompressor, metadata_file_extension, ) from pyiceberg.table import StaticTable, TableProperties @@ -76,9 +75,8 @@ class ExampleRequest(IcebergBaseModel): ("location", "compressor_type"), [ ("s3://bucket/table/metadata/00000-table.gz.metadata.json", GzipCompressor), - ("s3://bucket/table/metadata/00000-table.metadata.json.gz", GzipCompressor), - ("s3://bucket/table/metadata/00000-table.zst.metadata.json", ZstdCompressor), ("s3://bucket/table/metadata/00000-table.metadata.json", NoopCompressor), + ("s3://bucket/table/metadata/00000-table.lz4.metadata.json", NoopCompressor), ], ) def test_get_compressor_detects_metadata_compression(location: str, compressor_type: type[Compressor]) -> None: @@ -92,20 +90,19 @@ def test_get_compressor_detects_metadata_compression(location: str, compressor_t ("none", NoopCompressor), ("gzip", GzipCompressor), ("GZIP", GzipCompressor), - ("zstd", ZstdCompressor), ], ) def test_from_codec_name(codec_name: str | None, compressor_type: type[Compressor]) -> None: assert isinstance(Compressor.from_codec_name(codec_name), compressor_type) -@pytest.mark.parametrize("codec_name", ["lz4", "snappy", "bogus"]) +@pytest.mark.parametrize("codec_name", ["zstd", "lz4", "snappy", "bogus"]) def test_from_codec_name_raises_for_unknown_codec(codec_name: str) -> None: with pytest.raises(ValueError, match=f"Unsupported metadata compression codec: {codec_name}"): Compressor.from_codec_name(codec_name) -@pytest.mark.parametrize("codec_name", ["none", "gzip", "zstd"]) +@pytest.mark.parametrize("codec_name", ["none", "gzip"]) def test_table_metadata_round_trip_with_compression( tmp_path: Path, example_table_metadata_v2: dict[str, Any], codec_name: str ) -> None: @@ -117,6 +114,14 @@ def test_table_metadata_round_trip_with_compression( ToOutputFile.table_metadata(metadata, file_io.new_output(location=metadata_location), overwrite=True) + raw_bytes = Path(metadata_location).read_bytes() + if codec_name == "gzip": + # Java writes gzip-compressed metadata when this codec is set; assert we match by + # checking the gzip magic bytes rather than relying on the (lossless) round trip. + assert raw_bytes[:2] == b"\x1f\x8b" + else: + assert raw_bytes.lstrip().startswith(b"{") + parsed_metadata = FromInputFile.table_metadata(file_io.new_input(location=metadata_location)) assert parsed_metadata == metadata assert parsed_metadata.table_uuid == metadata.table_uuid @@ -129,7 +134,6 @@ def test_table_metadata_round_trip_with_compression( ({}, ".metadata.json"), ({TableProperties.WRITE_METADATA_COMPRESSION: "none"}, ".metadata.json"), ({TableProperties.WRITE_METADATA_COMPRESSION: "gzip"}, ".gz.metadata.json"), - ({TableProperties.WRITE_METADATA_COMPRESSION: "zstd"}, ".zst.metadata.json"), ], ) def test_new_table_metadata_file_location_uses_metadata_compression(