Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 133 additions & 1 deletion pyiceberg/table/puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,31 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
import math
import zlib
from collections.abc import Iterable
from typing import TYPE_CHECKING, Literal

from pydantic import Field
from pyroaring import BitMap, FrozenBitMap

from pyiceberg import __version__
from pyiceberg.io import OutputFile
from pyiceberg.typedef import IcebergBaseModel

if TYPE_CHECKING:
import pyarrow as pa

# Short for: Puffin Fratercula arctica, version 1
MAGIC_BYTES = b"PFA1"
DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64"
EMPTY_BITMAP = FrozenBitMap()
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"
# Reserved field id of the row position (_pos) metadata column, referenced by
# deletion-vector-v1 blob metadata (Java: MetadataColumns.ROW_POSITION)
ROW_POSITION_FIELD_ID = 2147483645


def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
Expand Down Expand Up @@ -62,6 +71,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
return bitmaps


def _serialize_bitmaps(bitmaps: dict[int, BitMap]) -> bytes:
"""
Serialize a dictionary of bitmaps into a byte array.

The format is:
- 8 bytes: number of bitmaps (little-endian)
- For each bitmap:
- 4 bytes: key (little-endian)
- n bytes: serialized bitmap
"""
with io.BytesIO() as out:
sorted_keys = sorted(bitmaps.keys())

# number of bitmaps
out.write(len(sorted_keys).to_bytes(8, "little"))

for key in sorted_keys:
if key < 0:
raise ValueError(f"Invalid unsigned key: {key}")
if key > MAX_JAVA_SIGNED:
raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl")

# key
out.write(key.to_bytes(4, "little"))
# bitmap
out.write(bitmaps[key].serialize())
return out.getvalue()


class PuffinBlobMetadata(IcebergBaseModel):
type: Literal["deletion-vector-v1"] = Field()
fields: list[int] = Field()
Expand All @@ -81,7 +119,10 @@ class Footer(IcebergBaseModel):
def _bitmaps_to_chunked_array(bitmaps: list[BitMap]) -> "pa.ChunkedArray":
import pyarrow as pa

return pa.chunked_array([(key_pos << 32) + pos for pos in bitmap] for key_pos, bitmap in enumerate(bitmaps))
return pa.chunked_array(
([(key_pos << 32) + pos for pos in bitmap] for key_pos, bitmap in enumerate(bitmaps)),
type=pa.int64(),
)


class PuffinFile:
Expand Down Expand Up @@ -114,3 +155,94 @@ def __init__(self, puffin: bytes) -> None:

def to_vector(self) -> dict[str, "pa.ChunkedArray"]:
return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()}


class PuffinWriter:
"""Writes a Puffin file containing a single deletion-vector-v1 blob to an output file."""

_output_file: OutputFile
_blobs: list[PuffinBlobMetadata]
_blob_payloads: list[bytes]
_created_by: str

def __init__(self, output_file: OutputFile, created_by: str | None = None) -> None:
self._output_file = output_file
self._blobs = []
self._blob_payloads = []
self._created_by = created_by if created_by is not None else f"PyIceberg version {__version__}"

def set_blob(
self,
positions: Iterable[int],
referenced_data_file: str,
) -> None:
"""Set the deletion vector blob for a data file, replacing any previously set blob.

Args:
positions: Zero-based positions of the deleted rows in the referenced data file.
referenced_data_file: Location of the data file the deletion vector applies to.
"""
bitmaps: dict[int, BitMap] = {}
for pos in positions:
if pos < 0:
raise ValueError(f"Invalid position: {pos}, positions must be non-negative")
key = pos >> 32
low_bits = pos & 0xFFFFFFFF
if key not in bitmaps:
bitmaps[key] = BitMap()
bitmaps[key].add(low_bits)

if not bitmaps:
raise ValueError("Deletion vector must contain at least one position")

cardinality = sum(len(bm) for bm in bitmaps.values())
vector_payload = _serialize_bitmaps(bitmaps)

# deletion-vector-v1 blob layout: combined length of magic and vector (4 bytes, big-endian),
# the DV magic bytes, the serialized vector, and a CRC-32 checksum of magic + vector (4 bytes, big-endian)
blob_content = DELETION_VECTOR_MAGIC + vector_payload
blob_payload = len(blob_content).to_bytes(4, "big") + blob_content + zlib.crc32(blob_content).to_bytes(4, "big")

blob_metadata = PuffinBlobMetadata(
type="deletion-vector-v1",
fields=[ROW_POSITION_FIELD_ID],
# -1 means the snapshot id and sequence number are inherited at commit time
snapshot_id=-1,
sequence_number=-1,
# offset and length are placeholders; finish() fills them in when assembling the file
offset=0,
length=0,
properties={PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)},
compression_codec=None,
)

# Replace any previously set blob only after the new blob has been built and validated,
# so a failed call cannot discard an already-set blob. We only support one blob at the moment.
self._blobs = [blob_metadata]
self._blob_payloads = [blob_payload]

def finish(self) -> int:
"""Write the Puffin file to the output file and return its size in bytes."""
with io.BytesIO() as out:
out.write(MAGIC_BYTES)

blobs_metadata: list[PuffinBlobMetadata] = []
for blob_metadata, blob_payload in zip(self._blobs, self._blob_payloads, strict=True):
blobs_metadata.append(blob_metadata.model_copy(update={"offset": out.tell(), "length": len(blob_payload)}))
out.write(blob_payload)

footer = Footer(blobs=blobs_metadata, properties={"created-by": self._created_by})
footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8")

out.write(MAGIC_BYTES)
out.write(footer_payload_bytes)
out.write(len(footer_payload_bytes).to_bytes(4, "little"))
out.write((0).to_bytes(4, "little")) # flags
out.write(MAGIC_BYTES)

puffin_bytes = out.getvalue()

with self._output_file.create(overwrite=True) as output_stream:
output_stream.write(puffin_bytes)

return len(puffin_bytes)
Loading