Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pyiceberg/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,30 @@
from pyiceberg.utils.config import Config

GZIP = "gzip"
NONE = "none"


class Compressor(ABC):
@staticmethod
def get_compressor(location: str) -> Compressor:
# An unknown codec extension (e.g. ".lz4.metadata.json") falls through to NOOP,
# matching Java's TableMetadataParser.Codec.fromFileName, which treats any
# non-".gz" infix as NONE.
return GzipCompressor() if location.endswith(".gz.metadata.json") else NOOP_COMPRESSOR

@staticmethod
def from_codec_name(codec_name: str | None) -> Compressor:
if codec_name is None:
return NOOP_COMPRESSOR

normalized_codec_name = codec_name.lower()
if normalized_codec_name == NONE:
return NOOP_COMPRESSOR
elif normalized_codec_name == GZIP:
return GzipCompressor()
else:
raise ValueError(f"Unsupported metadata compression codec: {codec_name}")

@abstractmethod
def stream_decompressor(self, inp: InputStream) -> InputStream:
"""Return a stream decompressor.
Expand Down Expand Up @@ -73,6 +90,16 @@ def bytes_compressor(self) -> Callable[[bytes], bytes]:
return gzip.compress


def metadata_file_extension(codec_name: str | None) -> str:
Compressor.from_codec_name(codec_name)

normalized_codec_name = NONE if codec_name is None else codec_name.lower()
return {
NONE: ".metadata.json",
GZIP: ".gz.metadata.json",
}[normalized_codec_name]


class FromByteStream:
"""A collection of methods that deserialize dictionaries into Iceberg objects."""

Expand Down
2 changes: 2 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ class TableProperties:
WRITE_FILE_FORMAT = "write.format.default"
WRITE_FILE_FORMAT_DEFAULT = "parquet"
WRITE_METADATA_PATH = "write.metadata.path"
WRITE_METADATA_COMPRESSION = "write.metadata.compression-codec"
WRITE_METADATA_COMPRESSION_DEFAULT = "none"

DELETE_MODE = "write.delete.mode"
DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
Expand Down
9 changes: 8 additions & 1 deletion pyiceberg/table/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,14 @@ def new_table_metadata_file_location(self, new_version: int = 0) -> str:
if new_version < 0:
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")

file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
from pyiceberg.serializers import metadata_file_extension
from pyiceberg.table import TableProperties

codec = self.table_properties.get(
TableProperties.WRITE_METADATA_COMPRESSION, TableProperties.WRITE_METADATA_COMPRESSION_DEFAULT
)
suffix = metadata_file_extension(codec)
file_name = f"{new_version:05d}-{uuid.uuid4()}{suffix}"
return self.new_metadata_location(file_name)

def new_metadata_location(self, metadata_file_name: str) -> str:
Expand Down
100 changes: 97 additions & 3 deletions tests/test_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,23 @@
import json
import os
import uuid
from pathlib import Path
from typing import Any

import pytest
from pytest_mock import MockFixture

from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import StaticTable
from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg.serializers import (
Compressor,
FromInputFile,
GzipCompressor,
NoopCompressor,
ToOutputFile,
metadata_file_extension,
)
from pyiceberg.table import StaticTable, TableProperties
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2
from pyiceberg.table.update import AssertRefSnapshotId, TableRequirement
from pyiceberg.typedef import IcebergBaseModel

Expand Down Expand Up @@ -60,3 +69,88 @@ class ExampleRequest(IcebergBaseModel):
dumped_json = request.model_dump_json()
expected_json = """{"type":"assert-ref-snapshot-id","ref":"main","snapshot-id":null}"""
assert expected_json in dumped_json


@pytest.mark.parametrize(
("location", "compressor_type"),
[
("s3://bucket/table/metadata/00000-table.gz.metadata.json", GzipCompressor),
("s3://bucket/table/metadata/00000-table.metadata.json", NoopCompressor),
("s3://bucket/table/metadata/00000-table.lz4.metadata.json", NoopCompressor),
],
)
def test_get_compressor_detects_metadata_compression(location: str, compressor_type: type[Compressor]) -> None:
assert isinstance(Compressor.get_compressor(location), compressor_type)


@pytest.mark.parametrize(
("codec_name", "compressor_type"),
[
(None, NoopCompressor),
("none", NoopCompressor),
("gzip", GzipCompressor),
("GZIP", GzipCompressor),
],
)
def test_from_codec_name(codec_name: str | None, compressor_type: type[Compressor]) -> None:
assert isinstance(Compressor.from_codec_name(codec_name), compressor_type)


@pytest.mark.parametrize("codec_name", ["zstd", "lz4", "snappy", "bogus"])
def test_from_codec_name_raises_for_unknown_codec(codec_name: str) -> None:
with pytest.raises(ValueError, match=f"Unsupported metadata compression codec: {codec_name}"):
Compressor.from_codec_name(codec_name)


@pytest.mark.parametrize("codec_name", ["none", "gzip"])
def test_table_metadata_round_trip_with_compression(
tmp_path: Path, example_table_metadata_v2: dict[str, Any], codec_name: str
) -> None:
from pyiceberg.io.pyarrow import PyArrowFileIO

metadata = TableMetadataV2(**example_table_metadata_v2)
file_io = PyArrowFileIO()
metadata_location = str(tmp_path / f"{uuid.uuid4()}{metadata_file_extension(codec_name)}")

ToOutputFile.table_metadata(metadata, file_io.new_output(location=metadata_location), overwrite=True)

raw_bytes = Path(metadata_location).read_bytes()
if codec_name == "gzip":
# Java writes gzip-compressed metadata when this codec is set; assert we match by
# checking the gzip magic bytes rather than relying on the (lossless) round trip.
assert raw_bytes[:2] == b"\x1f\x8b"
else:
assert raw_bytes.lstrip().startswith(b"{")

parsed_metadata = FromInputFile.table_metadata(file_io.new_input(location=metadata_location))
assert parsed_metadata == metadata
assert parsed_metadata.table_uuid == metadata.table_uuid
assert parsed_metadata.schema() == metadata.schema()


@pytest.mark.parametrize(
("table_properties", "expected_suffix"),
[
({}, ".metadata.json"),
({TableProperties.WRITE_METADATA_COMPRESSION: "none"}, ".metadata.json"),
({TableProperties.WRITE_METADATA_COMPRESSION: "gzip"}, ".gz.metadata.json"),
],
)
def test_new_table_metadata_file_location_uses_metadata_compression(
table_properties: dict[str, str], expected_suffix: str
) -> None:
provider = load_location_provider(table_location="table_location", table_properties=table_properties)

metadata_location = provider.new_table_metadata_file_location(new_version=3)

assert metadata_location.startswith("table_location/metadata/00003-")
assert metadata_location.endswith(expected_suffix)


def test_new_table_metadata_file_location_raises_for_unknown_compression() -> None:
provider = load_location_provider(
table_location="table_location", table_properties={TableProperties.WRITE_METADATA_COMPRESSION: "bogus"}
)

with pytest.raises(ValueError, match="Unsupported metadata compression codec: bogus"):
provider.new_table_metadata_file_location(new_version=3)