Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@

T = TypeVar("T")

POSITIONAL_DELETE_WRITE_SCHEMA = Schema(
NestedField(2147483546, "file_path", StringType()),
NestedField(2147483545, "pos", LongType()),
)


@lru_cache
def _cached_resolve_s3_region(bucket: str) -> str | None:
Expand Down Expand Up @@ -2685,6 +2690,81 @@ def write_parquet(task: WriteTask) -> DataFile:
return iter(data_files)


def _write_position_delete_file(
io: FileIO,
table_metadata: TableMetadata,
write_uuid: uuid.UUID,
counter: itertools.count[int],
partition: Record,
spec_id: int,
delete_rows: Iterable[tuple[str, int]],
) -> DataFile:
"""Write one v2 position-delete Parquet file for a single partition."""
rows = sorted(delete_rows)
if not rows:
raise ValueError("Cannot write an empty position-delete file")

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
row_group_size = property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT,
)
arrow_schema = pa.schema(
[
pa.field("file_path", pa.string(), nullable=False, metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"2147483546"}),
pa.field("pos", pa.int64(), nullable=False, metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"2147483545"}),
]
)
arrow_table = pa.Table.from_arrays(
[
pa.array([file_path for file_path, _ in rows], type=pa.string()),
pa.array([pos for _, pos in rows], type=pa.int64()),
],
schema=arrow_schema,
)

location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties)
file_path = location_provider.new_data_location(
data_file_name=f"00000-{next(counter)}-{write_uuid}-deletes.parquet",
)
fo = io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_table.schema, store_decimal_as_integer=True, **parquet_writer_kwargs) as writer:
writer.write(arrow_table, row_group_size=row_group_size)

stats_columns = compute_statistics_plan(POSITIONAL_DELETE_WRITE_SCHEMA, table_metadata.properties)
# The reader routes a position-delete file to a data file by its exact file_path bound, so the
# bound must be the full, untruncated path rather than the default truncate(16) string metric.
stats_columns[2147483546] = StatisticsCollector(
field_id=2147483546,
iceberg_type=StringType(),
mode=MetricsMode(MetricModeTypes.FULL),
column_name="file_path",
)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=writer.writer.metadata,
stats_columns=stats_columns,
parquet_column_mapping=parquet_path_to_id_mapping(POSITIONAL_DELETE_WRITE_SCHEMA),
)

data_file = DataFile.from_args(
_table_format_version=table_metadata.format_version,
content=DataFileContent.POSITION_DELETES,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=partition,
file_size_in_bytes=len(fo),
sort_order_id=None,
spec_id=spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict(),
)
data_file.spec_id = spec_id
return data_file


def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
"""Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size``.

Expand Down
15 changes: 9 additions & 6 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,9 +774,7 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition
MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()}


POSITIONAL_DELETE_SCHEMA = Schema(
NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", IntegerType())
)
POSITIONAL_DELETE_SCHEMA = Schema(NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", LongType()))


class ManifestFile(Record):
Expand Down Expand Up @@ -1214,11 +1212,13 @@ def __init__(
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
content: ManifestContent = ManifestContent.DATA,
):
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)
self._content = content

def content(self) -> ManifestContent:
return ManifestContent.DATA
return self._content

@property
def version(self) -> TableVersion:
Expand All @@ -1228,7 +1228,7 @@ def version(self) -> TableVersion:
def _meta(self) -> dict[str, str]:
return {
**super()._meta,
"content": "data",
"content": "deletes" if self._content == ManifestContent.DELETES else "data",
}

def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
Expand All @@ -1247,11 +1247,14 @@ def write_manifest(
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
content: ManifestContent = ManifestContent.DATA,
) -> ManifestWriter:
if format_version == 1:
if content != ManifestContent.DATA:
raise ValidationError("Cannot write delete manifests for a v1 table")
return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
elif format_version == 2:
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression, content)
else:
raise ValueError(f"Cannot write manifest for table version: {format_version}")

Expand Down
88 changes: 85 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,15 +716,23 @@ def delete(
"""
from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow

if isinstance(delete_filter, str):
delete_filter = _parse_row_filter(delete_filter)

if (
self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT)
== TableProperties.DELETE_MODE_MERGE_ON_READ
):
if self.table_metadata.format_version == 2:
self._delete_merge_on_read_position_deletes(
delete_filter=delete_filter,
snapshot_properties=snapshot_properties,
case_sensitive=case_sensitive,
branch=branch,
)
return
warnings.warn("Merge on read is not yet supported, falling back to copy-on-write", stacklevel=2)

if isinstance(delete_filter, str):
delete_filter = _parse_row_filter(delete_filter)

with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot:
delete_snapshot.delete_by_predicate(delete_filter, case_sensitive)

Expand Down Expand Up @@ -790,6 +798,80 @@ def delete(
if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed:
warnings.warn("Delete operation did not match any records", stacklevel=2)

def _delete_merge_on_read_position_deletes(
self,
delete_filter: BooleanExpression,
snapshot_properties: dict[str, str] = EMPTY_DICT,
case_sensitive: bool = True,
branch: str | None = MAIN_BRANCH,
) -> None:
import pyarrow as pa

from pyiceberg.io.pyarrow import (
ArrowScan,
_read_all_delete_files,
_write_position_delete_file,
expression_to_pyarrow,
)

file_scan = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive)
if branch is not None:
file_scan = file_scan.use_ref(branch)
tasks = list(file_scan.plan_files())

bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive)
pyarrow_filter = expression_to_pyarrow(bound_delete_filter, self.table_metadata.schema())
deletes_per_file = _read_all_delete_files(self._table.io, tasks)
delete_rows_by_partition: dict[tuple[int, Record], list[tuple[str, int]]] = {}

raw_scan = ArrowScan(
table_metadata=self.table_metadata,
io=self._table.io,
projected_schema=self.table_metadata.schema(),
row_filter=AlwaysTrue(),
case_sensitive=case_sensitive,
)

for task in tasks:
existing_deleted_positions: set[int] = set()
for positions in deletes_per_file.get(task.file.file_path, []):
existing_deleted_positions.update(int(pos) for pos in positions.to_pylist())

current_index = 0
raw_task = FileScanTask(task.file, delete_files=set(), residual=AlwaysTrue())
for batch in raw_scan.to_record_batches([raw_task]):
row_positions = pa.array(range(current_index, current_index + batch.num_rows), type=pa.int64())
current_index += batch.num_rows

batch_with_positions = pa.Table.from_batches([batch]).append_column("__pyiceberg_position", row_positions)
matching_positions = batch_with_positions.filter(pyarrow_filter).column("__pyiceberg_position").to_pylist()
rows_to_delete = [
(task.file.file_path, pos) for pos in matching_positions if pos not in existing_deleted_positions
]

if rows_to_delete:
key = (task.file.spec_id, task.file.partition)
delete_rows_by_partition.setdefault(key, []).extend(rows_to_delete)

if not delete_rows_by_partition:
warnings.warn("Delete operation did not match any records", stacklevel=2)
return

with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot:
counter = itertools.count(0)
for (spec_id, partition), delete_rows in delete_rows_by_partition.items():
delete_snapshot.append_delete_file(
_write_position_delete_file(
io=self._table.io,
table_metadata=self.table_metadata,
write_uuid=delete_snapshot.commit_uuid,
counter=counter,
partition=partition,
spec_id=spec_id,
delete_rows=delete_rows,
)
)

def upsert(
self,
df: pa.Table,
Expand Down
52 changes: 48 additions & 4 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
_snapshot_id: int
_parent_snapshot_id: int | None
_added_data_files: list[DataFile]
_added_delete_files: list[DataFile]
_manifest_num_counter: itertools.count[int]
_deleted_data_files: set[DataFile]
_compression: AvroCompressionCodec
Expand All @@ -120,6 +121,7 @@ def __init__(
self._operation = operation
self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
self._added_data_files = []
self._added_delete_files = []
self._deleted_data_files = set()
self.snapshot_properties = snapshot_properties
self._manifest_num_counter = itertools.count(0)
Expand Down Expand Up @@ -148,6 +150,10 @@ def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
self._added_data_files.append(data_file)
return self

def append_delete_file(self, delete_file: DataFile) -> _SnapshotProducer[U]:
self._added_delete_files.append(delete_file)
return self

def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
self._deleted_data_files.add(data_file)
return self
Expand Down Expand Up @@ -195,6 +201,29 @@ def _write_added_manifest() -> list[ManifestFile]:
else:
return []

def _write_added_delete_manifest() -> list[ManifestFile]:
if self._added_delete_files:
delete_manifests = []
partition_groups: dict[int, list[DataFile]] = defaultdict(list)
for delete_file in self._added_delete_files:
partition_groups[delete_file.spec_id].append(delete_file)
for spec_id, delete_files in partition_groups.items():
with self.new_manifest_writer(spec=self.spec(spec_id), content=ManifestContent.DELETES) as writer:
for delete_file in delete_files:
writer.add(
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=self._snapshot_id,
sequence_number=None,
file_sequence_number=None,
data_file=delete_file,
)
)
delete_manifests.append(writer.to_manifest_file())
return delete_manifests
else:
return []

def _write_delete_manifest() -> list[ManifestFile]:
# Check if we need to mark the files as deleted
deleted_entries = self._deleted_entries()
Expand All @@ -218,10 +247,13 @@ def _write_delete_manifest() -> list[ManifestFile]:
executor = ExecutorFactory.get_or_create()

added_manifests = executor.submit(_write_added_manifest)
added_delete_manifests = executor.submit(_write_added_delete_manifest)
delete_manifests = executor.submit(_write_delete_manifest)
existing_manifests = executor.submit(self._existing_manifests)

return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result())
return self._process_manifests(
added_manifests.result() + added_delete_manifests.result() + delete_manifests.result() + existing_manifests.result()
)

def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
from pyiceberg.table import TableProperties
Expand All @@ -230,6 +262,7 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
table_metadata = self._transaction.table_metadata
schema = table_metadata.schema()
default_spec = table_metadata.spec()
specs = table_metadata.specs()

partition_summary_limit = int(
table_metadata.properties.get(
Expand All @@ -244,9 +277,14 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
partition_spec=default_spec,
schema=schema,
)
for delete_file in self._added_delete_files:
ssc.add_file(
data_file=delete_file,
partition_spec=specs[delete_file.spec_id],
schema=schema,
)

if len(self._deleted_data_files) > 0:
specs = table_metadata.specs()
for data_file in self._deleted_data_files:
ssc.remove_file(
data_file=data_file,
Expand Down Expand Up @@ -339,14 +377,15 @@ def schema(self) -> Schema:
def spec(self, spec_id: int) -> PartitionSpec:
return self._transaction.table_metadata.specs()[spec_id]

def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
def new_manifest_writer(self, spec: PartitionSpec, content: ManifestContent = ManifestContent.DATA) -> ManifestWriter:
return write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=spec,
schema=self.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
content=content,
)

def new_manifest_output(self) -> OutputFile:
Expand Down Expand Up @@ -400,11 +439,16 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):

def _commit(self) -> UpdatesAndRequirements:
# Only produce a commit when there is something to delete
if self.files_affected:
if self.files_affected or self._added_delete_files:
return super()._commit()
else:
return (), ()

def append_delete_file(self, delete_file: DataFile) -> _DeleteFiles:
self._operation = Operation.OVERWRITE
super().append_delete_file(delete_file)
return self

@cached_property
def _compute_deletes(self) -> tuple[list[ManifestFile], list[ManifestEntry], bool]:
"""Computes all the delete operation and cache it when nothing changes.
Expand Down
Loading