Core: Arrow position delete reader V2#16440
Draft
Baunsgaard wants to merge 8 commits into
Draft
Conversation
Add PositionDeleteRangeConsumer that coalesces runs of consecutive positions into a single delete(start, end) call on a PositionDeleteIndex, and use it from Deletes.toPositionIndex() so sorted position delete files are inserted into the bitmap as ranges instead of one position at a time. The RangeAccumulator inner class exposes two entry points so callers can pick the cheapest one for their layout: - accept(long) for streaming, per-element callers that genuinely do not buffer (mostly tests). - acceptAll(long[], int, int) for callers that already have positions in a primitive array. The bulk path runs the same sniff + coalesce state machine but keeps it out of the per-element frame, so steady state is one gap-check branch per position rather than a four-branch dispatch. This is the path the Arrow vectorized reader uses. PositionDeleteRangeConsumer.forEach is the entry point for boxed Iterable<Long> sources (the record-reader path in Deletes.toPositionIndex). It buffers positions into a small primitive long[64] -- 512 bytes, comfortably L1-resident -- and feeds each batch to acceptAll. Benchmarks show this captures the full bulk-path win on dense inputs (about 12% faster and ~10x lower run-to-run variance) at 1/16 the allocation cost of larger drain buffers; the inner-loop throughput of acceptAll is the same regardless of slice length, so a larger buffer would just allocate more without buying anything.
Introduce a new SPI in FormatModelRegistry so engines that can decode position-delete files into bitmap indexes more efficiently than the generic record reader can register a fast path per FileFormat: - Add the PositionDeleteIndexReader interface with two operations: read(file, dataLocation, deleteFile) for a single data-file filter, and readAll(file, deleteFile) for the cache path that wants every data file's bitmap. - Add registerPositionDeleteIndexReader(FileFormat, reader) and positionDeleteIndexReader(FileFormat) lookup on FormatModelRegistry. Registration is once-per-format with the same exclusivity contract as existing FormatModel registration; lookup returns Optional so callers can fall back when no reader is registered. This commit only adds the registry capability. Subsequent commits register an Arrow implementation and have BaseDeleteLoader consult the registry before opening the per-row record reader.
…te files
Adds a public VectorizedPositionDeleteReader in iceberg-arrow that
materializes a PositionDeleteIndex from a Parquet position-delete file
without going through Record / boxed-Long iteration:
- reads pos as a primitive long via ArrowBuf.getLong (pos is required
by the Iceberg position-delete spec, so the per-row validity check
is intentionally skipped; Javadoc documents the invariant);
- reads file_path via VarCharVector#getArrowVector so dictionary-
encoded paths decode once, then compares UTF-8 bytes directly
against a single byte[] target built per read;
- coalesces consecutive matching positions into delete(start, end+1)
range inserts via an internal RangeCoalescer shared by the
no-filter and filtered paths;
- projects pos only when no path filter is requested, the typical
deletion-vector case.
Adds public PositionDeleteIndex.create() and create(DeleteFile)
factories in iceberg-core so callers outside the deletes package can
build a mutable index without going through CloseableIterable<Long>
or referencing the package-private BitmapPositionDeleteIndex. Both
factories are documented as mutable and not safe for concurrent
mutation, with @see cross-references between empty(), create(), and
create(DeleteFile).
Tests:
- core: 3 cases for the new factories (empty mutable index without
provenance, non-null DeleteFile recorded as provenance, null
DeleteFile yields no provenance).
- arrow: 7 test methods (9 invocations including a parameterized
invalid-batch-size case) covering filter-by-path with two data
files, dense coalescing on 50k contiguous positions, no-filter
mode unioning positions across two data files, null-input
rejection, batch-size equivalence, and DeleteFile provenance
through the reader.
No new dependencies and no engine wiring: engines opt in by calling
VectorizedPositionDeleteReader.read(...) directly; ArrowReader's
behavior on delete files is unchanged.
Reuse existing primitives and broaden test coverage based on alignment review: - Reuse DeleteSchemaUtil.pathPosSchema() instead of re-declaring the (file_path, pos) schema locally; this is the same schema used by the rest of the delete-file pipeline. - Replace the hard-coded `setArrowValidityVector= true` with NullCheckingForGet.NULL_CHECKING_ENABLED, matching the convention in ArrowFormatModels. - Document why DEFAULT_BATCH_SIZE (8192) diverges from the VectorizedArrowReader default (5000): position-delete files project at most two narrow columns, so a larger batch amortises per-batch decoding cost. - Make TestVectorizedPositionDeleteReader public to match the rest of the iceberg-arrow test suite. - Add three tests: dictionary-encoded file_path filtering, an unknown-path filter that returns an empty index, and a check that unreadable input surfaces a RuntimeException with the file location preserved (the Parquet stack wraps IOExceptions as RuntimeIOException, so the read() catch block fires only on close failures). - Tighten an assertion label in filtersByDataFilePath, drop a redundant exists/delete in the test helper, and break a 105-char line in honorsExplicitBatchSize.
Wire the zero-copy position-delete reader into the new SPI so delete loaders can find it through FormatModelRegistry instead of via a hard iceberg-arrow dependency: - Add VectorizedPositionDeleteReader.readAllByDataFile(...) which decodes an entire delete file into one PositionDeleteIndex per data file path. The implementation tracks the active path's bytes and reuses the existing range coalescer; it works on both sorted and unsorted delete files (the latter by reusing an existing index in the result map when a path reappears). - Have ArrowFormatModels.register() also register an Arrow-backed PositionDeleteIndexReader for FileFormat.PARQUET. - Cover the new entry point with tests for the grouped read, contiguous-run coalescing, null input rejection, and a registry round-trip that proves ArrowFormatModels.register() wires the SPI when iceberg-arrow is on the classpath.
Have BaseDeleteLoader consult FormatModelRegistry for a registered PositionDeleteIndexReader before opening the per-row record reader. When iceberg-arrow is on the classpath, parquet position-delete files go through the Arrow zero-copy reader; when no reader is registered for the format, the existing generic-record path is used unchanged. The change is contained to readPosDeletes(...) in both the cached and uncached branches, so estimateSize and merge semantics are unaffected. Add a focused test that registers a stub reader and verifies the loader forwards the data-file path to it, plus a control test that asserts the unchanged fallback behavior when no reader is registered.
Replace the per-row full byte comparison in appendGroupedByPath with a length-plus-first-byte fast filter. The common case for position delete files (sorted by file_path, single data file dominating a batch) now pays roughly three buffer reads per row instead of one read per byte of the path, with a single full-path verification at the end of the run. A divergent middle byte at the same length and first byte (only possible for unsorted files) is still handled correctly: when the end-of-run verification disagrees, the method falls back to the per-row full byte comparison so the grouping stays exact.
Replace the package-local RangeCoalescer with the public PositionDeleteRangeConsumer.RangeAccumulator primitive introduced in iceberg-core, and switch each of the three hot loops to the bulk acceptAll(long[], int, int) entry point so the per-row work stays inside the accumulator's tight loop. - appendAll: copies each batch's positions into a scratch long[] once and bulk-accepts the whole batch. - appendFiltered: packs the matching positions of each batch into the scratch buffer and bulk-accepts per matching run; a non-matching row drains the buffer and flushes the accumulator so a gap caused by another data file never coalesces. - appendGroupedByPath: bulk-accepts each path run (already bounded by endOfPathRun) into the active per-path accumulator. The breakRun() call sites switch to flush() (same semantics: emit the active run, allow the next accept to start a new one). The scratch buffer is grown lazily to the largest batch size seen and reused across batches.
a3d29ee to
1f54776
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Add
VectorizedPositionDeleteReader, an Arrow-vectorized reader for V2 position delete files. Reads(file_path, pos)directly from ArrowVarChar/BigIntbuffers and feeds the sharedRangeAccumulatorfrom #16052, so consecutive positions becomePositionDeleteIndex.delete(start, end)range inserts. No per-row Java allocation on the hot path.Stacked on #16052, that PR adds the coalescing primitive in
iceberg-core; this PR wires it into the Arrow read path.BaseDeleteLoader.readPosDeletesnow dispatches through a newPositionDeleteIndexReaderSPI onFormatModelRegistry, so the Arrow path is picked up automatically wheniceberg-arrowis on the classpath.Like #16052, this primarily benefits Iceberg V2 tables; V3 DVs deserialize directly and bypass both paths.
Single-file read (PositionDeleteReaderBenchmark, single-shot)
Multi-file via BaseDeleteLoader (BaseDeleteLoaderBenchmark, average time)