Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
)
from pyiceberg.io.fileformat import DataFileStatistics as DataFileStatistics
from pyiceberg.manifest import (
POSITIONAL_DELETE_SCHEMA,
DataFile,
DataFileContent,
FileFormat,
Expand Down Expand Up @@ -2685,6 +2686,101 @@ def write_parquet(task: WriteTask) -> DataFile:
return iter(data_files)


def write_position_delete_file(
io: FileIO,
table_metadata: TableMetadata,
referenced_data_file: DataFile,
positions: Iterable[int],
write_uuid: uuid.UUID | None = None,
counter: itertools.count[int] | None = None,
) -> DataFile:
"""Write one Iceberg v2 position-delete Parquet file for a single referenced data file.

The delete file's `file_path` column is the constant referenced_data_file.file_path
(verbatim, including any scheme), and `pos` are the deleted row positions, deduplicated
and sorted ascending (spec sort order is (file_path, pos); file_path is constant here).
The returned DataFile has content=POSITION_DELETES, equality_ids=None, partition and
spec_id copied from the referenced data file so it is partition-scoped, and record_count
equal to the number of deleted positions.
"""
if table_metadata.format_version < 2:
raise ValueError(f"Position deletes are only supported for v2+ tables, got v{table_metadata.format_version}")
if referenced_data_file.content != DataFileContent.DATA:
raise ValueError(f"referenced_data_file must be a DATA file, got {referenced_data_file.content}")

try:
spec_id = referenced_data_file.spec_id
except AttributeError:
spec_id = table_metadata.default_spec_id

write_uuid = write_uuid or uuid.uuid4()
counter = counter or itertools.count(0)

delete_positions = sorted({int(position) for position in positions})
if not delete_positions:
raise ValueError("Cannot write an empty position-delete file")
if delete_positions[0] < 0:
raise ValueError("Position-delete positions must be non-negative")

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
row_group_size = property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT,
)
arrow_schema = pa.schema(
[
pa.field("file_path", pa.string(), nullable=False, metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"2147483546"}),
pa.field("pos", pa.int64(), nullable=False, metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"2147483545"}),
]
)
arrow_table = pa.Table.from_arrays(
[
pa.array([referenced_data_file.file_path] * len(delete_positions), type=pa.string()),
pa.array(delete_positions, type=pa.int64()),
],
schema=arrow_schema,
)

location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties)
file_path = location_provider.new_data_location(
data_file_name=f"00000-{next(counter)}-{write_uuid}-deletes.parquet",
)
fo = io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_table.schema, store_decimal_as_integer=True, **parquet_writer_kwargs) as writer:
writer.write(arrow_table, row_group_size=row_group_size)

stats_columns = compute_statistics_plan(POSITIONAL_DELETE_SCHEMA, table_metadata.properties)
stats_columns[2147483546] = StatisticsCollector(
field_id=2147483546,
iceberg_type=StringType(),
mode=MetricsMode(MetricModeTypes.FULL),
column_name="file_path",
)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=writer.writer.metadata,
stats_columns=stats_columns,
parquet_column_mapping=parquet_path_to_id_mapping(POSITIONAL_DELETE_SCHEMA),
)

data_file = DataFile.from_args(
_table_format_version=table_metadata.format_version,
content=DataFileContent.POSITION_DELETES,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=referenced_data_file.partition,
file_size_in_bytes=len(fo),
sort_order_id=None,
spec_id=spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict(),
)
data_file.spec_id = spec_id
return data_file


def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
"""Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size``.

Expand Down
16 changes: 12 additions & 4 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,10 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition
MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()}


POSITIONAL_DELETE_SCHEMA = Schema(NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", LongType()))
POSITIONAL_DELETE_SCHEMA = Schema(
NestedField(2147483546, "file_path", StringType(), required=True),
NestedField(2147483545, "pos", LongType(), required=True),
)


class ManifestFile(Record):
Expand Down Expand Up @@ -1212,11 +1215,13 @@ def __init__(
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
content: ManifestContent = ManifestContent.DATA,
):
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)
self._content = content

def content(self) -> ManifestContent:
return ManifestContent.DATA
return self._content

@property
def version(self) -> TableVersion:
Expand All @@ -1226,7 +1231,7 @@ def version(self) -> TableVersion:
def _meta(self) -> dict[str, str]:
return {
**super()._meta,
"content": "data",
"content": "deletes" if self._content == ManifestContent.DELETES else "data",
}

def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
Expand All @@ -1245,11 +1250,14 @@ def write_manifest(
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
content: ManifestContent = ManifestContent.DATA,
) -> ManifestWriter:
if format_version == 1:
if content != ManifestContent.DATA:
raise ValidationError("Cannot write delete manifests for a v1 table")
return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
elif format_version == 2:
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression, content)
else:
raise ValueError(f"Cannot write manifest for table version: {format_version}")

Expand Down
94 changes: 91 additions & 3 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
_snapshot_id: int
_parent_snapshot_id: int | None
_added_data_files: list[DataFile]
_added_delete_files: list[DataFile]
_manifest_num_counter: itertools.count[int]
_deleted_data_files: set[DataFile]
_compression: AvroCompressionCodec
Expand All @@ -120,6 +121,7 @@ def __init__(
self._operation = operation
self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
self._added_data_files = []
self._added_delete_files = []
self._deleted_data_files = set()
self.snapshot_properties = snapshot_properties
self._manifest_num_counter = itertools.count(0)
Expand Down Expand Up @@ -195,6 +197,29 @@ def _write_added_manifest() -> list[ManifestFile]:
else:
return []

def _write_added_delete_manifest() -> list[ManifestFile]:
if self._added_delete_files:
added_delete_manifests = []
partition_groups: dict[int, list[DataFile]] = defaultdict(list)
for delete_file in self._added_delete_files:
partition_groups[delete_file.spec_id].append(delete_file)
for spec_id, delete_files in partition_groups.items():
with self.new_manifest_writer(spec=self.spec(spec_id), content=ManifestContent.DELETES) as writer:
for delete_file in delete_files:
writer.add(
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=self._snapshot_id,
sequence_number=None,
file_sequence_number=None,
data_file=delete_file,
)
)
added_delete_manifests.append(writer.to_manifest_file())
return added_delete_manifests
else:
return []

def _write_delete_manifest() -> list[ManifestFile]:
# Check if we need to mark the files as deleted
deleted_entries = self._deleted_entries()
Expand All @@ -218,10 +243,13 @@ def _write_delete_manifest() -> list[ManifestFile]:
executor = ExecutorFactory.get_or_create()

added_manifests = executor.submit(_write_added_manifest)
added_delete_manifests = executor.submit(_write_added_delete_manifest)
delete_manifests = executor.submit(_write_delete_manifest)
existing_manifests = executor.submit(self._existing_manifests)

return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result())
return self._process_manifests(
added_manifests.result() + added_delete_manifests.result() + delete_manifests.result() + existing_manifests.result()
)

def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
from pyiceberg.table import TableProperties
Expand All @@ -245,8 +273,15 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
schema=schema,
)

specs = table_metadata.specs()
for delete_file in self._added_delete_files:
ssc.add_file(
data_file=delete_file,
partition_spec=specs[delete_file.spec_id],
schema=schema,
)

if len(self._deleted_data_files) > 0:
specs = table_metadata.specs()
for data_file in self._deleted_data_files:
ssc.remove_file(
data_file=data_file,
Expand Down Expand Up @@ -339,14 +374,15 @@ def schema(self) -> Schema:
def spec(self, spec_id: int) -> PartitionSpec:
return self._transaction.table_metadata.specs()[spec_id]

def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
def new_manifest_writer(self, spec: PartitionSpec, content: ManifestContent = ManifestContent.DATA) -> ManifestWriter:
return write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=spec,
schema=self.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
content=content,
)

def new_manifest_output(self) -> OutputFile:
Expand Down Expand Up @@ -529,6 +565,48 @@ def _deleted_entries(self) -> list[ManifestEntry]:
return []


class _RowDeltaFiles(_SnapshotProducer["_RowDeltaFiles"]):
"""Commits appended data files AND position-delete files as ONE row-delta (OVERWRITE) snapshot.

Existing manifests are carried forward untouched (no copy-on-write rewrite); the existing
positional-delete reader applies the new delete files. Delete-file sequence numbers are left
unassigned and stamped at commit, so delete seq == data seq == snapshot seq (a true row delta).
"""

def append_delete_file(self, delete_file: DataFile) -> _RowDeltaFiles:
if delete_file.content != DataFileContent.POSITION_DELETES:
raise ValueError(f"append_delete_file requires a position-delete file, got {delete_file.content}")
self._added_delete_files.append(delete_file)
return self

def _existing_manifests(self) -> list[ManifestFile]:
"""Carry forward all existing manifests from the parent snapshot, like _FastAppendFiles."""
existing_manifests = []

if self._parent_snapshot_id is not None:
previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)

if previous_snapshot is None:
raise ValueError(f"Snapshot could not be found: {self._parent_snapshot_id}")

for manifest in previous_snapshot.manifests(io=self._io):
if manifest.has_added_files() or manifest.has_existing_files() or manifest.added_snapshot_id == self._snapshot_id:
existing_manifests.append(manifest)

return existing_manifests

def _deleted_entries(self) -> list[ManifestEntry]:
return []

def _commit(self) -> UpdatesAndRequirements:
if self._added_data_files or self._added_delete_files:
return super()._commit()
return (), ()


RowDeltaSnapshotProducer = _RowDeltaFiles


class _MergeAppendFiles(_FastAppendFiles):
_target_size_bytes: int
_min_count_to_merge: int
Expand Down Expand Up @@ -720,6 +798,16 @@ def overwrite(self, commit_uuid: uuid.UUID | None = None) -> _OverwriteFiles:
snapshot_properties=self._snapshot_properties,
)

def row_delta(self, commit_uuid: uuid.UUID | None = None) -> _RowDeltaFiles:
return _RowDeltaFiles(
operation=Operation.OVERWRITE,
transaction=self._transaction,
io=self._io,
commit_uuid=commit_uuid,
snapshot_properties=self._snapshot_properties,
branch=self._branch,
)

def delete(self) -> _DeleteFiles:
return _DeleteFiles(
operation=Operation.DELETE,
Expand Down
Loading