diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 076098c757..4c9a036a96 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1146,6 +1146,15 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray] raise ValueError(f"Delete file format not supported: {data_file.file_format}") +def _read_equality_delete(io: FileIO, data_file: DataFile) -> pa.Table: + if data_file.file_format in {FileFormat.PARQUET, FileFormat.ORC}: + with io.new_input(data_file.file_path).open() as fi: + delete_fragment = _get_file_format(data_file.file_format, pre_buffer=True, buffer_size=ONE_MEGABYTE).make_fragment(fi) + return ds.Scanner.from_fragment(fragment=delete_fragment).to_table().combine_chunks() + else: + raise ValueError(f"Equality delete file format not supported: {data_file.file_format}") + + def _combine_positional_deletes(positional_deletes: list[pa.ChunkedArray], start_index: int, end_index: int) -> pa.Array: if len(positional_deletes) == 1: all_chunks = positional_deletes[0] @@ -1164,6 +1173,178 @@ def _combine_positional_deletes(positional_deletes: list[pa.ChunkedArray], start return pc.subtract(result, pa.scalar(start_index)) +def _equality_delete_key_names( + equality_ids: list[int], + data_schema: Schema, + delete_table: pa.Table, + table_schema: Schema, + downcast_ns_timestamp_to_us: bool, + format_version: TableVersion, +) -> tuple[list[str], list[str]]: + delete_schema = pyarrow_to_schema( + delete_table.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version + ) + data_names: list[str] = [] + for field_id in equality_ids: + try: + data_name = data_schema.find_field(field_id).name + except ValueError: + data_name = table_schema.find_field(field_id).name + data_names.append(data_name) + + delete_names: list[str] = [] + for field_id, data_name in zip(equality_ids, data_names, strict=True): + try: + delete_name = delete_schema.find_field(field_id).name + except ValueError: + delete_name = table_schema.find_field(field_id).name + if delete_name not in delete_table.column_names and data_name in delete_table.column_names: + delete_name = data_name + delete_names.append(delete_name) + + return data_names, delete_names + + +def _integer_type_bounds(arrow_type: pa.DataType) -> tuple[int, int]: + bit_width = arrow_type.bit_width + if pa.types.is_unsigned_integer(arrow_type): + return 0, (2**bit_width) - 1 + return -(2 ** (bit_width - 1)), (2 ** (bit_width - 1)) - 1 + + +def _cast_equality_delete_key_column(column: pa.ChunkedArray, data_type: pa.DataType) -> pa.ChunkedArray: + if column.type == data_type: + return column + + try: + return column.cast(data_type) + except pa.lib.ArrowInvalid: + if not (pa.types.is_integer(column.type) and pa.types.is_integer(data_type)): + raise + + source_min, source_max = _integer_type_bounds(column.type) + target_min, target_max = _integer_type_bounds(data_type) + if target_min > source_max or target_max < source_min: + return pa.chunked_array([pa.nulls(len(chunk), type=data_type) for chunk in column.chunks]) + + in_range = pc.is_valid(column) + if target_min > source_min: + in_range = pc.and_(in_range, pc.greater_equal(column, pa.scalar(target_min, type=column.type))) + if target_max < source_max: + in_range = pc.and_(in_range, pc.less_equal(column, pa.scalar(target_max, type=column.type))) + + column = pc.if_else(in_range, column, pa.scalar(None, type=column.type)) + return column.cast(data_type) + + +def _materialize_missing_equality_delete_columns( + data_table: pa.Table, equality_ids: list[int], data_names: list[str], data_schema: Schema, table_schema: Schema +) -> pa.Table: + for field_id, data_name in zip(equality_ids, data_names, strict=True): + try: + data_schema.find_field(field_id) + except ValueError: + if data_name not in data_table.column_names: + table_field = table_schema.find_field(field_id) + arrow_type = schema_to_pyarrow(table_field.field_type) + data_table = data_table.append_column( + pa.field(data_name, arrow_type), pa.nulls(data_table.num_rows, type=arrow_type) + ) + + return data_table + + +def _equality_delete_key_table( + delete_table: pa.Table, data_table: pa.Table, data_names: list[str], delete_names: list[str] +) -> pa.Table: + arrays: list[pa.ChunkedArray] = [] + for data_name, delete_name in zip(data_names, delete_names, strict=True): + column = delete_table.column(delete_name) + data_type = data_table.schema.field(data_name).type + column = _cast_equality_delete_key_column(column, data_type) + arrays.append(column) + + return pa.Table.from_arrays(arrays, names=data_names) + + +def _has_null_equality_key(key_table: pa.Table, key_names: list[str]) -> pa.ChunkedArray: + null_key_mask = pc.is_null(key_table.column(key_names[0])) + for key_name in key_names[1:]: + null_key_mask = pc.or_(null_key_mask, pc.is_null(key_table.column(key_name))) + return null_key_mask + + +def _apply_null_equality_delete_rows(data_table: pa.Table, delete_key_table: pa.Table, key_names: list[str]) -> pa.Table: + if data_table.num_rows == 0 or delete_key_table.num_rows == 0: + return data_table + + data_table = data_table.combine_chunks() + delete_key_table = delete_key_table.combine_chunks() + match = pa.array([False] * data_table.num_rows) + + for row_idx in range(delete_key_table.num_rows): + row_match = pa.array([True] * data_table.num_rows) + for key_name in key_names: + delete_value = delete_key_table.column(key_name)[row_idx] + data_column = data_table.column(key_name) + column_match = ( + pc.fill_null(pc.equal(data_column, delete_value), False) if delete_value.is_valid else pc.is_null(data_column) + ) + row_match = pc.and_(row_match, column_match) + match = pc.or_(match, row_match) + + return data_table.filter(pc.invert(match)) + + +def _apply_equality_deletes( + batch: pa.RecordBatch, + file_project_schema: Schema, + table_schema: Schema, + equality_deletes: list[tuple[list[int], pa.Table]] | None, + downcast_ns_timestamp_to_us: bool, + format_version: TableVersion, +) -> pa.RecordBatch: + if not equality_deletes or batch.num_rows == 0: + return batch + + data_table = pa.Table.from_batches([batch]) + for equality_ids, delete_table in equality_deletes: + if not equality_ids: + raise ValueError("Equality delete file is missing required equality_ids") + if delete_table.num_rows == 0: + continue + + data_names, delete_names = _equality_delete_key_names( + equality_ids, + file_project_schema, + delete_table, + table_schema, + downcast_ns_timestamp_to_us, + format_version, + ) + data_table = _materialize_missing_equality_delete_columns( + data_table, equality_ids, data_names, file_project_schema, table_schema + ) + delete_key_table = _equality_delete_key_table(delete_table, data_table, data_names, delete_names) + null_key_mask = _has_null_equality_key(delete_key_table, data_names) + non_null_delete_keys = delete_key_table.filter(pc.invert(null_key_mask)) + null_delete_keys = delete_key_table.filter(null_key_mask) + + if non_null_delete_keys.num_rows > 0: + data_table = data_table.join(non_null_delete_keys, keys=data_names, join_type="left anti") + + # PyArrow's anti-join uses SQL null semantics. Iceberg equality deletes use + # IS NOT DISTINCT FROM, so null-key delete rows need an explicit null-aware pass. + # A fully vectorized null-aware anti-join is a production follow-up. + if null_delete_keys.num_rows > 0: + data_table = _apply_null_equality_delete_rows(data_table, null_delete_keys, data_names) + + if data_table.num_rows == 0: + return batch.slice(0, 0) + + return data_table.combine_chunks().to_batches()[0] + + def pyarrow_to_schema( schema: pa.Schema, name_mapping: NameMapping | None = None, @@ -1621,6 +1802,7 @@ def _task_to_record_batches( projected_field_ids: set[int], positional_deletes: list[ChunkedArray] | None, case_sensitive: bool, + equality_deletes: list[tuple[list[int], pa.Table]] | None = None, name_mapping: NameMapping | None = None, partition_spec: PartitionSpec | None = None, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, @@ -1691,6 +1873,15 @@ def _task_to_record_batches( else: current_batch = table.combine_chunks().to_batches()[0] + current_batch = _apply_equality_deletes( + current_batch, + file_project_schema, + table_schema, + equality_deletes, + downcast_ns_timestamp_to_us, + format_version, + ) + # skip empty batches if current_batch.num_rows == 0: continue @@ -1705,14 +1896,23 @@ def _task_to_record_batches( ) -def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[str, list[ChunkedArray]]: +def _read_all_delete_files( + io: FileIO, tasks: Iterable[FileScanTask] +) -> tuple[dict[str, list[ChunkedArray]], dict[str, pa.Table]]: + tasks = list(tasks) deletes_per_file: dict[str, list[ChunkedArray]] = {} - unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks])) - if len(unique_deletes) > 0: + equality_deletes_per_file: dict[str, pa.Table] = {} + unique_positional_deletes = { + delete_file + for task in tasks + for delete_file in task.delete_files + if delete_file.content == DataFileContent.POSITION_DELETES + } + if len(unique_positional_deletes) > 0: executor = ExecutorFactory.get_or_create() deletes_per_files: Iterator[dict[str, ChunkedArray]] = executor.map( lambda args: _read_deletes(*args), - [(io, delete_file) for delete_file in unique_deletes], + [(io, delete_file) for delete_file in unique_positional_deletes], ) for delete in deletes_per_files: for file, arr in delete.items(): @@ -1721,7 +1921,21 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st else: deletes_per_file[file] = [arr] - return deletes_per_file + unique_equality_deletes = { + delete_file + for task in tasks + for delete_file in task.delete_files + if delete_file.content == DataFileContent.EQUALITY_DELETES + } + if len(unique_equality_deletes) > 0: + executor = ExecutorFactory.get_or_create() + equality_delete_files: Iterator[tuple[str, pa.Table]] = executor.map( + lambda args: (args[1].file_path, _read_equality_delete(*args)), + [(io, delete_file) for delete_file in unique_equality_deletes], + ) + equality_deletes_per_file.update(equality_delete_files) + + return deletes_per_file, equality_deletes_per_file class ArrowScan: @@ -1826,7 +2040,8 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record ResolveError: When a required field cannot be found in the file ValueError: When a field type in the file cannot be projected to the schema type """ - deletes_per_file = _read_all_delete_files(self._io, tasks) + tasks = list(tasks) + deletes_per_file, equality_deletes_per_file = _read_all_delete_files(self._io, tasks) total_row_count = 0 executor = ExecutorFactory.get_or_create() @@ -1835,7 +2050,7 @@ def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]: # Materialize the iterator here to ensure execution happens within the executor. # Otherwise, the iterator would be lazily consumed later (in the main thread), # defeating the purpose of using executor.map. - return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)) + return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, equality_deletes_per_file)) limit_reached = False for batches in executor.map(batches_for_task, tasks): @@ -1855,21 +2070,34 @@ def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]: break def _record_batches_from_scan_tasks_and_deletes( - self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[ChunkedArray]] + self, + tasks: Iterable[FileScanTask], + deletes_per_file: dict[str, list[ChunkedArray]], + equality_deletes_per_file: dict[str, pa.Table], ) -> Iterator[pa.RecordBatch]: total_row_count = 0 for task in tasks: if self._limit is not None and total_row_count >= self._limit: break + equality_deletes = [ + (delete_file.equality_ids or [], equality_deletes_per_file[delete_file.file_path]) + for delete_file in task.delete_files + if delete_file.content == DataFileContent.EQUALITY_DELETES + ] + projected_field_ids = set(self._projected_field_ids) + for equality_ids, _ in equality_deletes: + projected_field_ids.update(equality_ids) + batches = _task_to_record_batches( self._io, task, self._bound_row_filter, self._projected_schema, self._table_metadata.schema(), - self._projected_field_ids, + projected_field_ids, deletes_per_file.get(task.file.file_path), self._case_sensitive, + equality_deletes, self._table_metadata.name_mapping(), self._table_metadata.specs().get(task.file.spec_id), self._table_metadata.format_version, diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 597f62632f..38763eb094 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2009,21 +2009,13 @@ def from_rest_response( Returns: A FileScanTask with the converted data and delete files. - - Raises: - NotImplementedError: If equality delete files are encountered. """ - from pyiceberg.catalog.rest.scan_planning import RESTEqualityDeleteFile - data_file = _rest_file_to_data_file(rest_task.data_file) resolved_deletes: set[DataFile] = set() if rest_task.delete_file_references: for idx in rest_task.delete_file_references: - delete_file = delete_files[idx] - if isinstance(delete_file, RESTEqualityDeleteFile): - raise NotImplementedError(f"PyIceberg does not yet support equality deletes: {delete_file.file_path}") - resolved_deletes.add(_rest_file_to_data_file(delete_file)) + resolved_deletes.add(_rest_file_to_data_file(delete_files[idx])) return FileScanTask( data_file=data_file, @@ -2034,7 +2026,7 @@ def from_rest_response( def _rest_file_to_data_file(rest_file: RESTContentFile) -> DataFile: """Convert a REST content file to a manifest DataFile.""" - from pyiceberg.catalog.rest.scan_planning import RESTDataFile + from pyiceberg.catalog.rest.scan_planning import RESTDataFile, RESTEqualityDeleteFile if isinstance(rest_file, RESTDataFile): column_sizes = rest_file.column_sizes.to_dict() if rest_file.column_sizes else None @@ -2047,6 +2039,8 @@ def _rest_file_to_data_file(rest_file: RESTContentFile) -> DataFile: null_value_counts = None nan_value_counts = None + equality_ids = rest_file.equality_ids if isinstance(rest_file, RESTEqualityDeleteFile) else None + data_file = DataFile.from_args( content=DataFileContent.from_rest_type(rest_file.content), file_path=rest_file.file_path, @@ -2058,6 +2052,7 @@ def _rest_file_to_data_file(rest_file: RESTContentFile) -> DataFile: value_counts=value_counts, null_value_counts=null_value_counts, nan_value_counts=nan_value_counts, + equality_ids=equality_ids, split_offsets=rest_file.split_offsets, sort_order_id=rest_file.sort_order_id, ) @@ -2335,7 +2330,7 @@ def plan_files(self, manifests: Iterable[ManifestFile]) -> Iterable[FileScanTask List of FileScanTasks that contain both data and delete files. """ data_entries: list[ManifestEntry] = [] - delete_index = DeleteFileIndex() + delete_index = DeleteFileIndex(self.table_metadata.schema()) residual_evaluators: dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) @@ -2346,7 +2341,7 @@ def plan_files(self, manifests: Iterable[ManifestFile]) -> Iterable[FileScanTask elif data_file.content == DataFileContent.POSITION_DELETES: delete_index.add_delete_file(manifest_entry, partition_key=data_file.partition) elif data_file.content == DataFileContent.EQUALITY_DELETES: - raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") + delete_index.add_delete_file(manifest_entry, partition_key=data_file.partition) else: raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") diff --git a/pyiceberg/table/delete_file_index.py b/pyiceberg/table/delete_file_index.py index 3f513aabe5..7359ce0e74 100644 --- a/pyiceberg/table/delete_file_index.py +++ b/pyiceberg/table/delete_file_index.py @@ -17,12 +17,17 @@ from __future__ import annotations from bisect import bisect_left +from typing import TYPE_CHECKING +from pyiceberg.conversions import from_bytes from pyiceberg.expressions import EqualTo from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator -from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, ManifestEntry +from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, DataFileContent, ManifestEntry from pyiceberg.typedef import Record +if TYPE_CHECKING: + from pyiceberg.schema import Schema + PATH_FIELD_ID = 2147483546 @@ -59,6 +64,14 @@ def referenced_delete_files(self) -> list[DataFile]: return [data_file for data_file, _ in self._files] +class EqualityDeletes(PositionDeletes): + """Collects equality delete files and indexes them by sequence number.""" + + def add(self, delete_file: DataFile, seq_num: int) -> None: + # Equality deletes apply only to data files with a strictly smaller sequence number. + super().add(delete_file, seq_num - 1) + + def _has_path_bounds(delete_file: DataFile) -> bool: lower = delete_file.lower_bounds upper = delete_file.upper_bounds @@ -76,6 +89,58 @@ def _applies_to_data_file(delete_file: DataFile, data_file: DataFile) -> bool: return evaluator.eval(delete_file) +def _is_all_null(data_file: DataFile, field_id: int) -> bool: + null_counts = data_file.null_value_counts + value_counts = data_file.value_counts + if not null_counts or not value_counts: + return False + null_count = null_counts.get(field_id) + value_count = value_counts.get(field_id) + return null_count is not None and value_count is not None and null_count == value_count + + +def _has_no_nulls(data_file: DataFile, field_id: int) -> bool: + null_counts = data_file.null_value_counts + if not null_counts: + return False + return null_counts.get(field_id) == 0 + + +def _eq_applies_to_data_file(eq_delete_file: DataFile, data_file: DataFile, schema: Schema) -> bool: + if not eq_delete_file.equality_ids: + return True + + for field_id in eq_delete_file.equality_ids: + if _is_all_null(data_file, field_id) and _has_no_nulls(eq_delete_file, field_id): + return False + if _is_all_null(eq_delete_file, field_id) and _has_no_nulls(data_file, field_id): + return False + + if ( + eq_delete_file.lower_bounds + and eq_delete_file.upper_bounds + and data_file.lower_bounds + and data_file.upper_bounds + and field_id in eq_delete_file.lower_bounds + and field_id in eq_delete_file.upper_bounds + and field_id in data_file.lower_bounds + and field_id in data_file.upper_bounds + ): + field_type = schema.find_type(field_id) + if not field_type.is_primitive: + continue + + eq_lower = from_bytes(field_type, eq_delete_file.lower_bounds[field_id]) + eq_upper = from_bytes(field_type, eq_delete_file.upper_bounds[field_id]) + data_lower = from_bytes(field_type, data_file.lower_bounds[field_id]) + data_upper = from_bytes(field_type, data_file.upper_bounds[field_id]) + + if eq_upper < data_lower or eq_lower > data_upper: + return False + + return True + + def _referenced_data_file_path(delete_file: DataFile) -> str | None: """Return the path, if the path bounds evaluate to the same location.""" lower_bounds = delete_file.lower_bounds @@ -103,26 +168,33 @@ def _partition_key(spec_id: int, partition: Record | None) -> tuple[int, Record] class DeleteFileIndex: - """Indexes position delete files by partition and by exact data file path.""" + """Indexes position and equality delete files by partition and by exact data file path.""" - def __init__(self) -> None: + def __init__(self, schema: Schema | None = None) -> None: + self._schema = schema self._by_partition: dict[tuple[int, Record], PositionDeletes] = {} self._by_path: dict[str, PositionDeletes] = {} + self._eq_deletes: dict[tuple[int, Record] | None, EqualityDeletes] = {} def is_empty(self) -> bool: - return not self._by_partition and not self._by_path + return not self._by_partition and not self._by_path and not self._eq_deletes def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: Record | None = None) -> None: delete_file = manifest_entry.data_file seq = manifest_entry.sequence_number or INITIAL_SEQUENCE_NUMBER - target_path = _referenced_data_file_path(delete_file) - if target_path: - deletes = self._by_path.setdefault(target_path, PositionDeletes()) - deletes.add(delete_file, seq) - else: - key = _partition_key(delete_file.spec_id or 0, partition_key) - deletes = self._by_partition.setdefault(key, PositionDeletes()) + if delete_file.content == DataFileContent.POSITION_DELETES: + target_path = _referenced_data_file_path(delete_file) + if target_path: + deletes = self._by_path.setdefault(target_path, PositionDeletes()) + deletes.add(delete_file, seq) + else: + key = _partition_key(delete_file.spec_id or 0, partition_key) + deletes = self._by_partition.setdefault(key, PositionDeletes()) + deletes.add(delete_file, seq) + elif delete_file.content == DataFileContent.EQUALITY_DELETES: + eq_key: tuple[int, Record] | None = _partition_key(delete_file.spec_id or 0, partition_key) if partition_key else None + deletes = self._eq_deletes.setdefault(eq_key, EqualityDeletes()) deletes.add(delete_file, seq) def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record | None = None) -> set[DataFile]: @@ -133,15 +205,29 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record spec_id = data_file.spec_id or 0 key = _partition_key(spec_id, partition_key) - partition_deletes = self._by_partition.get(key) - if partition_deletes: - for delete_file in partition_deletes.filter_by_seq(seq_num): + partition_pos_deletes = self._by_partition.get(key) + if partition_pos_deletes: + for delete_file in partition_pos_deletes.filter_by_seq(seq_num): if _applies_to_data_file(delete_file, data_file): deletes.add(delete_file) - path_deletes = self._by_path.get(data_file.file_path) - if path_deletes: - deletes.update(path_deletes.filter_by_seq(seq_num)) + path_pos_deletes = self._by_path.get(data_file.file_path) + if path_pos_deletes: + deletes.update(path_pos_deletes.filter_by_seq(seq_num)) + + candidate_eq_deletes: list[DataFile] = [] + partition_eq_deletes = self._eq_deletes.get(key) + if partition_eq_deletes: + candidate_eq_deletes.extend(partition_eq_deletes.filter_by_seq(seq_num)) + + global_eq_deletes = self._eq_deletes.get(None) + if global_eq_deletes: + candidate_eq_deletes.extend(global_eq_deletes.filter_by_seq(seq_num)) + + for eq_delete_file in candidate_eq_deletes: + if self._schema and not _eq_applies_to_data_file(eq_delete_file, data_file, self._schema): + continue + deletes.add(eq_delete_file) return deletes @@ -154,4 +240,7 @@ def referenced_delete_files(self) -> list[DataFile]: for deletes in self._by_path.values(): data_files.extend(deletes.referenced_delete_files()) + for deletes in self._eq_deletes.values(): + data_files.extend(deletes.referenced_delete_files()) + return data_files diff --git a/tests/catalog/test_scan_planning_models.py b/tests/catalog/test_scan_planning_models.py index f2c80cfb9b..3fc191a030 100644 --- a/tests/catalog/test_scan_planning_models.py +++ b/tests/catalog/test_scan_planning_models.py @@ -38,7 +38,7 @@ ValueMap, ) from pyiceberg.expressions import AlwaysTrue, EqualTo, Reference -from pyiceberg.manifest import FileFormat +from pyiceberg.manifest import DataFileContent, FileFormat TEST_URI = "https://iceberg-test-catalog/" @@ -545,7 +545,7 @@ def test_plan_scan_cancelled(rest_scan_catalog: RestCatalog, requests_mock: Mock list(rest_scan_catalog.plan_scan(("db", "tbl"), request)) -def test_plan_scan_equality_deletes_not_supported(rest_scan_catalog: RestCatalog, requests_mock: Mocker) -> None: +def test_plan_scan_equality_deletes(rest_scan_catalog: RestCatalog, requests_mock: Mocker) -> None: file_one = _rest_data_file(file_path="s3://bucket/tbl/data/file1.parquet") equality_delete = _rest_equality_delete_file(equality_ids=[1, 2]) requests_mock.post( @@ -566,5 +566,10 @@ def test_plan_scan_equality_deletes_not_supported(rest_scan_catalog: RestCatalog ) request = PlanTableScanRequest() - with pytest.raises(NotImplementedError, match="PyIceberg does not yet support equality deletes"): - list(rest_scan_catalog.plan_scan(("db", "tbl"), request)) + tasks = list(rest_scan_catalog.plan_scan(("db", "tbl"), request)) + + assert len(tasks) == 1 + assert len(tasks[0].delete_files) == 1 + resolved_delete = next(iter(tasks[0].delete_files)) + assert resolved_delete.content == DataFileContent.EQUALITY_DELETES + assert resolved_delete.equality_ids == [1, 2] diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 407ec611fd..c8fc28dc9d 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -69,6 +69,7 @@ PyArrowFile, PyArrowFileIO, StatsAggregator, + _apply_equality_deletes, _check_pyarrow_schema_compatible, _ConvertToArrowSchema, _determine_partitions, @@ -1865,6 +1866,285 @@ def test_delete(deletes_file: str, request: pytest.FixtureRequest, table_schema_ assert str(with_deletes) == expected_str +def _scan_with_equality_delete( + tmp_path: Path, + schema: Schema, + data_rows: list[dict[str, Any]], + equality_ids: list[int], + delete_rows: list[dict[str, Any]], + delete_key_fields: list[tuple[str, pa.DataType, int]], + data_schema: Schema | None = None, +) -> pa.Table: + write_schema = data_schema or schema + data_arrow_schema = schema_to_pyarrow(write_schema, metadata={ICEBERG_SCHEMA: bytes(write_schema.model_dump_json(), UTF8)}) + data_table = pa.Table.from_pylist(data_rows, schema=data_arrow_schema) + data_file_path = str(tmp_path / "data.parquet") + _write_table_to_file(data_file_path, data_arrow_schema, data_table) + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=data_file_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=data_table.num_rows, + file_size_in_bytes=os.path.getsize(data_file_path), + ) + data_file.spec_id = 0 + + delete_schema = pa.schema( + [ + pa.field(name, field_type, metadata={PYARROW_PARQUET_FIELD_ID_KEY: bytes(str(field_id), UTF8)}) + for name, field_type, field_id in delete_key_fields + ] + ) + delete_table = pa.Table.from_pylist(delete_rows, schema=delete_schema) + delete_file_path = str(tmp_path / "eq-delete.parquet") + _write_table_to_file(delete_file_path, delete_schema, delete_table) + + delete_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path=delete_file_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=delete_table.num_rows, + file_size_in_bytes=os.path.getsize(delete_file_path), + equality_ids=equality_ids, + ) + delete_file.spec_id = 0 + + task = FileScanTask(data_file=data_file, delete_files={delete_file}) + return ArrowScan( + table_metadata=TableMetadataV2( + location=f"file://{tmp_path}", + last_column_id=max(schema.field_ids), + format_version=2, + current_schema_id=schema.schema_id, + schemas=[schema], + partition_specs=[PartitionSpec()], + ), + io=load_file_io(), + projected_schema=schema, + row_filter=AlwaysTrue(), + ).to_table([task]) + + +def test_equality_delete_single_column(tmp_path: Path) -> None: + schema = Schema(NestedField(1, "id", IntegerType(), required=True)) + + result = _scan_with_equality_delete( + tmp_path=tmp_path, + schema=schema, + data_rows=[{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}], + equality_ids=[1], + delete_rows=[{"id": 2}, {"id": 4}], + delete_key_fields=[("id", pa.int32(), 1)], + ) + + assert result.column("id").to_pylist() == [1, 3] + + +def test_equality_delete_multi_column(tmp_path: Path) -> None: + schema = Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField(2, "region", StringType(), required=True), + NestedField(3, "val", StringType(), required=True), + ) + + result = _scan_with_equality_delete( + tmp_path=tmp_path, + schema=schema, + data_rows=[ + {"id": 1, "region": "us", "val": "drop"}, + {"id": 1, "region": "eu", "val": "keep-eu"}, + {"id": 2, "region": "us", "val": "keep-us"}, + ], + equality_ids=[1, 2], + delete_rows=[{"id": 1, "region": "us"}], + delete_key_fields=[("id", pa.int32(), 1), ("region", pa.string(), 2)], + ) + + assert result.to_pydict() == {"id": [1, 2], "region": ["eu", "us"], "val": ["keep-eu", "keep-us"]} + + +def test_equality_delete_no_match(tmp_path: Path) -> None: + schema = Schema(NestedField(1, "id", IntegerType(), required=True)) + + result = _scan_with_equality_delete( + tmp_path=tmp_path, + schema=schema, + data_rows=[{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}], + equality_ids=[1], + delete_rows=[{"id": 2}, {"id": 99}], + delete_key_fields=[("id", pa.int32(), 1)], + ) + + assert result.column("id").to_pylist() == [1, 3, 4] + + +def test_equality_delete_missing_equality_ids_raises() -> None: + schema = Schema(NestedField(1, "id", IntegerType(), required=True)) + arrow_schema = schema_to_pyarrow(schema) + batch = pa.RecordBatch.from_arrays([pa.array([1, 2, 3], type=pa.int32())], schema=arrow_schema) + delete_table = pa.Table.from_arrays([pa.array([2], type=pa.int32())], names=["id"]) + + with pytest.raises(ValueError, match="equality_ids"): + _apply_equality_deletes( + batch, + schema, + schema, + [([], delete_table)], + downcast_ns_timestamp_to_us=False, + format_version=TableProperties.DEFAULT_FORMAT_VERSION, + ) + + +def test_equality_delete_missing_column_in_data_file_is_all_null(tmp_path: Path) -> None: + schema = Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField(2, "category", StringType(), required=False), + ) + data_schema = Schema(NestedField(1, "id", IntegerType(), required=True)) + + result = _scan_with_equality_delete( + tmp_path=tmp_path, + schema=schema, + data_schema=data_schema, + data_rows=[{"id": 1}, {"id": 2}, {"id": 3}], + equality_ids=[2], + delete_rows=[{"category": "delete"}], + delete_key_fields=[("category", pa.string(), 2)], + ) + + assert result.to_pydict() == {"id": [1, 2, 3], "category": [None, None, None]} + + +def test_equality_delete_overflowing_promoted_delete_key_does_not_match(tmp_path: Path) -> None: + schema = Schema(NestedField(1, "id", IntegerType(), required=True)) + + result = _scan_with_equality_delete( + tmp_path=tmp_path, + schema=schema, + data_rows=[{"id": 1}, {"id": 2}, {"id": 3}], + equality_ids=[1], + delete_rows=[{"id": 2**40}], + delete_key_fields=[("id", pa.int64(), 1)], + ) + + assert result.column("id").to_pylist() == [1, 2, 3] + + +def test_equality_delete_with_positional_delete(tmp_path: Path) -> None: + import pyarrow.parquet as pq + + schema = Schema(NestedField(1, "id", IntegerType(), required=True)) + data_arrow_schema = schema_to_pyarrow(schema, metadata={ICEBERG_SCHEMA: bytes(schema.model_dump_json(), UTF8)}) + data_table = pa.Table.from_pylist([{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}], schema=data_arrow_schema) + data_file_path = str(tmp_path / "data.parquet") + _write_table_to_file(data_file_path, data_arrow_schema, data_table) + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=data_file_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=data_table.num_rows, + file_size_in_bytes=os.path.getsize(data_file_path), + ) + data_file.spec_id = 0 + + # Positional delete removes row at index 0 (id=1). + pos_delete_path = str(tmp_path / "pos-delete.parquet") + pq.write_table(pa.table({"file_path": [data_file_path], "pos": [0]}), pos_delete_path) + pos_delete_file = DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path=pos_delete_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=1, + file_size_in_bytes=os.path.getsize(pos_delete_path), + ) + pos_delete_file.spec_id = 0 + + # Equality delete removes id=3 by value. + eq_delete_schema = pa.schema([pa.field("id", pa.int32(), metadata={PYARROW_PARQUET_FIELD_ID_KEY: bytes("1", UTF8)})]) + eq_delete_table = pa.Table.from_pylist([{"id": 3}], schema=eq_delete_schema) + eq_delete_path = str(tmp_path / "eq-delete.parquet") + _write_table_to_file(eq_delete_path, eq_delete_schema, eq_delete_table) + eq_delete_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path=eq_delete_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=1, + file_size_in_bytes=os.path.getsize(eq_delete_path), + equality_ids=[1], + ) + eq_delete_file.spec_id = 0 + + task = FileScanTask(data_file=data_file, delete_files={pos_delete_file, eq_delete_file}) + result = ArrowScan( + table_metadata=TableMetadataV2( + location=f"file://{tmp_path}", + last_column_id=max(schema.field_ids), + format_version=2, + current_schema_id=schema.schema_id, + schemas=[schema], + partition_specs=[PartitionSpec()], + ), + io=load_file_io(), + projected_schema=schema, + row_filter=AlwaysTrue(), + ).to_table([task]) + + # id=1 removed positionally, id=3 removed by equality; 2 and 4 survive. + assert result.column("id").to_pylist() == [2, 4] + + +def test_equality_delete_multi_batch(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + schema = Schema(NestedField(1, "id", IntegerType(), required=True)) + num_rows = 200_000 + delete_ids = {0, 50_000, 199_999} + batch_sizes: list[int] = [] + + def apply_equality_deletes_with_batch_tracking(batch: pa.RecordBatch, *args: Any, **kwargs: Any) -> pa.RecordBatch: + batch_sizes.append(batch.num_rows) + return _apply_equality_deletes(batch, *args, **kwargs) + + monkeypatch.setattr("pyiceberg.io.pyarrow._apply_equality_deletes", apply_equality_deletes_with_batch_tracking) + + result = _scan_with_equality_delete( + tmp_path=tmp_path, + schema=schema, + data_rows=[{"id": row_id} for row_id in range(num_rows)], + equality_ids=[1], + delete_rows=[{"id": row_id} for row_id in delete_ids], + delete_key_fields=[("id", pa.int32(), 1)], + ) + + ids = result.column("id").to_pylist() + + assert len(batch_sizes) > 1 + assert sum(batch_sizes) == num_rows + assert result.num_rows == num_rows - len(delete_ids) + assert delete_ids.isdisjoint(ids) + assert {1, 49_999, 50_001, 199_998}.issubset(ids) + + +def test_equality_delete_null_key(tmp_path: Path) -> None: + schema = Schema(NestedField(1, "id", IntegerType(), required=False)) + + result = _scan_with_equality_delete( + tmp_path=tmp_path, + schema=schema, + data_rows=[{"id": 1}, {"id": None}, {"id": 3}], + equality_ids=[1], + delete_rows=[{"id": None}], + delete_key_fields=[("id", pa.int32(), 1)], + ) + + assert result.column("id").to_pylist() == [1, 3] + + def test_delete_duplicates(deletes_file: str, request: pytest.FixtureRequest, table_schema_simple: Schema) -> None: # Determine file format from the file extension file_format = FileFormat.PARQUET if deletes_file.endswith(".parquet") else FileFormat.ORC diff --git a/tests/table/test_delete_file_index.py b/tests/table/test_delete_file_index.py index 09dd9ac81b..b570a4a8ee 100644 --- a/tests/table/test_delete_file_index.py +++ b/tests/table/test_delete_file_index.py @@ -16,12 +16,22 @@ # under the License. import pytest +from pyiceberg.conversions import to_bytes from pyiceberg.manifest import DataFile, DataFileContent, FileFormat, ManifestEntry, ManifestEntryStatus +from pyiceberg.schema import Schema from pyiceberg.table.delete_file_index import PATH_FIELD_ID, DeleteFileIndex, PositionDeletes from pyiceberg.typedef import Record +from pyiceberg.types import IntegerType, NestedField -def _create_data_file(file_path: str = "s3://bucket/data.parquet", spec_id: int = 0) -> DataFile: +def _create_data_file( + file_path: str = "s3://bucket/data.parquet", + spec_id: int = 0, + lower_bounds: dict[int, bytes] | None = None, + upper_bounds: dict[int, bytes] | None = None, + null_value_counts: dict[int, int] | None = None, + value_counts: dict[int, int] | None = None, +) -> DataFile: data_file = DataFile.from_args( content=DataFileContent.DATA, file_path=file_path, @@ -29,6 +39,10 @@ def _create_data_file(file_path: str = "s3://bucket/data.parquet", spec_id: int partition=Record(), record_count=100, file_size_in_bytes=1000, + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, + null_value_counts=null_value_counts, + value_counts=value_counts, ) data_file._spec_id = spec_id return data_file @@ -81,6 +95,31 @@ def _create_deletion_vector( return ManifestEntry.from_args(status=ManifestEntryStatus.ADDED, sequence_number=sequence_number, data_file=delete_file) +def _create_equality_delete( + sequence_number: int = 1, + spec_id: int = 0, + lower_bounds: dict[int, bytes] | None = None, + upper_bounds: dict[int, bytes] | None = None, + null_value_counts: dict[int, int] | None = None, + value_counts: dict[int, int] | None = None, +) -> ManifestEntry: + delete_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path=f"s3://bucket/eq-delete-{sequence_number}.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=10, + file_size_in_bytes=100, + equality_ids=[1], + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, + null_value_counts=null_value_counts, + value_counts=value_counts, + ) + delete_file._spec_id = spec_id + return ManifestEntry.from_args(status=ManifestEntryStatus.ADDED, sequence_number=sequence_number, data_file=delete_file) + + def test_empty_index() -> None: index = DeleteFileIndex() data_file = _create_data_file() @@ -187,3 +226,54 @@ def test_record_equality_for_partition_lookup() -> None: assert len(index.for_data_file(1, data_file, partition_b)) == 1 assert len(index.for_data_file(1, data_file, partition_c)) == 0 + + +def test_equality_delete_sequence_number_filtering() -> None: + index = DeleteFileIndex() + eq_delete = _create_equality_delete(sequence_number=2) + index.add_delete_file(eq_delete) + + data_file = _create_data_file() + + assert eq_delete.data_file in index.for_data_file(1, data_file) + assert eq_delete.data_file not in index.for_data_file(2, data_file) + assert eq_delete.data_file not in index.for_data_file(3, data_file) + + +def test_equality_delete_metrics_filtering() -> None: + schema = Schema(NestedField(1, "id", IntegerType(), required=True)) + index = DeleteFileIndex(schema=schema) + eq_delete = _create_equality_delete( + sequence_number=100, + lower_bounds={1: to_bytes(IntegerType(), 10)}, + upper_bounds={1: to_bytes(IntegerType(), 20)}, + ) + index.add_delete_file(eq_delete) + + no_overlap = _create_data_file( + "s3://bucket/no-overlap.parquet", + lower_bounds={1: to_bytes(IntegerType(), 0)}, + upper_bounds={1: to_bytes(IntegerType(), 5)}, + ) + overlap = _create_data_file( + "s3://bucket/overlap.parquet", + lower_bounds={1: to_bytes(IntegerType(), 15)}, + upper_bounds={1: to_bytes(IntegerType(), 25)}, + ) + + assert eq_delete.data_file not in index.for_data_file(1, no_overlap) + assert eq_delete.data_file in index.for_data_file(1, overlap) + + +def test_equality_delete_null_count_filtering() -> None: + schema = Schema(NestedField(1, "id", IntegerType(), required=False)) + index = DeleteFileIndex(schema=schema) + eq_delete = _create_equality_delete( + sequence_number=10, + null_value_counts={1: 10}, + value_counts={1: 10}, + ) + index.add_delete_file(eq_delete) + data_file = _create_data_file(null_value_counts={1: 0}, value_counts={1: 100}) + + assert eq_delete.data_file not in index.for_data_file(1, data_file)