Skip to content

Core: Arrow position delete reader V2#16440

Draft
Baunsgaard wants to merge 8 commits into
apache:mainfrom
Baunsgaard:arrow-position-delete-reader
Draft

Core: Arrow position delete reader V2#16440
Baunsgaard wants to merge 8 commits into
apache:mainfrom
Baunsgaard:arrow-position-delete-reader

Conversation

@Baunsgaard
Copy link
Copy Markdown
Contributor

@Baunsgaard Baunsgaard commented May 20, 2026

Add VectorizedPositionDeleteReader, an Arrow-vectorized reader for V2 position delete files. Reads (file_path, pos) directly from Arrow VarChar / BigInt buffers and feeds the shared RangeAccumulator from #16052, so consecutive positions become PositionDeleteIndex.delete(start, end) range inserts. No per-row Java allocation on the hot path.

Stacked on #16052, that PR adds the coalescing primitive in iceberg-core; this PR wires it into the Arrow read path.

BaseDeleteLoader.readPosDeletes now dispatches through a new PositionDeleteIndexReader SPI on FormatModelRegistry, so the Arrow path is picked up automatically when iceberg-arrow is on the classpath.

Like #16052, this primarily benefits Iceberg V2 tables; V3 DVs deserialize directly and bypass both paths.

Single-file read (PositionDeleteReaderBenchmark, single-shot)

Mode Distribution Baseline (s) Vectorized (s) Speedup
No filter dense 0.130 0.047 2.77x
No filter sparse 0.147 0.069 2.13x
Filtered by path dense 0.306 0.323 0.95x
Filtered by path sparse 0.338 0.356 0.95x

Filtered cases are bounded by per-row `file_path` comparison cost; both readers pay the same floor there.

Multi-file via BaseDeleteLoader (BaseDeleteLoaderBenchmark, average time)

Distribution v2-legacy (ms) v2-arrow (ms) v3-dv (ms) v2-arrow vs v2-legacy v3-dv vs v2-arrow
dense 152.99 ± 3.72 120.72 ± 20.00 0.012 ± 0.001 1.27x ~10,000x
sparse 163.42 ± 3.31 122.50 ± 2.84 1.025 ± 0.006 1.33x ~120x

V3 DVs (Puffin + RoaringBitmap) are on a different order of magnitude: this PR brings the V2 path faster. Recommended takeaway for reviewers: this is the right reader for tables that remain on V2, not a reason to stay on V2.

Add PositionDeleteRangeConsumer that coalesces runs of consecutive
positions into a single delete(start, end) call on a PositionDeleteIndex,
and use it from Deletes.toPositionIndex() so sorted position delete
files are inserted into the bitmap as ranges instead of one position at
a time.

The RangeAccumulator inner class exposes two entry points so callers can
pick the cheapest one for their layout:

- accept(long) for streaming, per-element callers that genuinely do not
  buffer (mostly tests).
- acceptAll(long[], int, int) for callers that already have positions
  in a primitive array. The bulk path runs the same sniff + coalesce
  state machine but keeps it out of the per-element frame, so steady
  state is one gap-check branch per position rather than a four-branch
  dispatch. This is the path the Arrow vectorized reader uses.

PositionDeleteRangeConsumer.forEach is the entry point for boxed
Iterable<Long> sources (the record-reader path in Deletes.toPositionIndex).
It buffers positions into a small primitive long[64] -- 512 bytes,
comfortably L1-resident -- and feeds each batch to acceptAll. Benchmarks
show this captures the full bulk-path win on dense inputs (about 12%
faster and ~10x lower run-to-run variance) at 1/16 the allocation cost
of larger drain buffers; the inner-loop throughput of acceptAll is the
same regardless of slice length, so a larger buffer would just allocate
more without buying anything.
Introduce a new SPI in FormatModelRegistry so engines that can decode
position-delete files into bitmap indexes more efficiently than the
generic record reader can register a fast path per FileFormat:

- Add the PositionDeleteIndexReader interface with two operations:
  read(file, dataLocation, deleteFile) for a single data-file filter,
  and readAll(file, deleteFile) for the cache path that wants every
  data file's bitmap.
- Add registerPositionDeleteIndexReader(FileFormat, reader) and
  positionDeleteIndexReader(FileFormat) lookup on FormatModelRegistry.
  Registration is once-per-format with the same exclusivity contract
  as existing FormatModel registration; lookup returns Optional so
  callers can fall back when no reader is registered.

This commit only adds the registry capability. Subsequent commits
register an Arrow implementation and have BaseDeleteLoader consult
the registry before opening the per-row record reader.
…te files

Adds a public VectorizedPositionDeleteReader in iceberg-arrow that
materializes a PositionDeleteIndex from a Parquet position-delete file
without going through Record / boxed-Long iteration:
  - reads pos as a primitive long via ArrowBuf.getLong (pos is required
    by the Iceberg position-delete spec, so the per-row validity check
    is intentionally skipped; Javadoc documents the invariant);
  - reads file_path via VarCharVector#getArrowVector so dictionary-
    encoded paths decode once, then compares UTF-8 bytes directly
    against a single byte[] target built per read;
  - coalesces consecutive matching positions into delete(start, end+1)
    range inserts via an internal RangeCoalescer shared by the
    no-filter and filtered paths;
  - projects pos only when no path filter is requested, the typical
    deletion-vector case.

Adds public PositionDeleteIndex.create() and create(DeleteFile)
factories in iceberg-core so callers outside the deletes package can
build a mutable index without going through CloseableIterable<Long>
or referencing the package-private BitmapPositionDeleteIndex. Both
factories are documented as mutable and not safe for concurrent
mutation, with @see cross-references between empty(), create(), and
create(DeleteFile).

Tests:
  - core: 3 cases for the new factories (empty mutable index without
    provenance, non-null DeleteFile recorded as provenance, null
    DeleteFile yields no provenance).
  - arrow: 7 test methods (9 invocations including a parameterized
    invalid-batch-size case) covering filter-by-path with two data
    files, dense coalescing on 50k contiguous positions, no-filter
    mode unioning positions across two data files, null-input
    rejection, batch-size equivalence, and DeleteFile provenance
    through the reader.

No new dependencies and no engine wiring: engines opt in by calling
VectorizedPositionDeleteReader.read(...) directly; ArrowReader's
behavior on delete files is unchanged.
Reuse existing primitives and broaden test coverage based on alignment
review:

- Reuse DeleteSchemaUtil.pathPosSchema() instead of re-declaring
  the (file_path, pos) schema locally; this is the same schema used
  by the rest of the delete-file pipeline.
- Replace the hard-coded `setArrowValidityVector= true` with
  NullCheckingForGet.NULL_CHECKING_ENABLED, matching the convention
  in ArrowFormatModels.
- Document why DEFAULT_BATCH_SIZE (8192) diverges from the
  VectorizedArrowReader default (5000): position-delete files
  project at most two narrow columns, so a larger batch amortises
  per-batch decoding cost.
- Make TestVectorizedPositionDeleteReader public to match the rest
  of the iceberg-arrow test suite.
- Add three tests: dictionary-encoded file_path filtering, an
  unknown-path filter that returns an empty index, and a check that
  unreadable input surfaces a RuntimeException with the file
  location preserved (the Parquet stack wraps IOExceptions as
  RuntimeIOException, so the read() catch block fires only on close
  failures).
- Tighten an assertion label in filtersByDataFilePath, drop a
  redundant exists/delete in the test helper, and break a 105-char
  line in honorsExplicitBatchSize.
Wire the zero-copy position-delete reader into the new SPI so delete
loaders can find it through FormatModelRegistry instead of via a hard
iceberg-arrow dependency:

- Add VectorizedPositionDeleteReader.readAllByDataFile(...) which
  decodes an entire delete file into one PositionDeleteIndex per data
  file path. The implementation tracks the active path's bytes and
  reuses the existing range coalescer; it works on both sorted and
  unsorted delete files (the latter by reusing an existing index in
  the result map when a path reappears).
- Have ArrowFormatModels.register() also register an Arrow-backed
  PositionDeleteIndexReader for FileFormat.PARQUET.
- Cover the new entry point with tests for the grouped read,
  contiguous-run coalescing, null input rejection, and a registry
  round-trip that proves ArrowFormatModels.register() wires the SPI
  when iceberg-arrow is on the classpath.
Have BaseDeleteLoader consult FormatModelRegistry for a registered
PositionDeleteIndexReader before opening the per-row record reader.
When iceberg-arrow is on the classpath, parquet position-delete files
go through the Arrow zero-copy reader; when no reader is registered
for the format, the existing generic-record path is used unchanged.

The change is contained to readPosDeletes(...) in both the cached and
uncached branches, so estimateSize and merge semantics are unaffected.

Add a focused test that registers a stub reader and verifies the
loader forwards the data-file path to it, plus a control test that
asserts the unchanged fallback behavior when no reader is registered.
Replace the per-row full byte comparison in appendGroupedByPath with a
length-plus-first-byte fast filter. The common case for position delete
files (sorted by file_path, single data file dominating a batch) now pays
roughly three buffer reads per row instead of one read per byte of the
path, with a single full-path verification at the end of the run.

A divergent middle byte at the same length and first byte (only possible
for unsorted files) is still handled correctly: when the end-of-run
verification disagrees, the method falls back to the per-row full byte
comparison so the grouping stays exact.
Replace the package-local RangeCoalescer with the public
PositionDeleteRangeConsumer.RangeAccumulator primitive introduced in
iceberg-core, and switch each of the three hot loops to the bulk
acceptAll(long[], int, int) entry point so the per-row work stays
inside the accumulator's tight loop.

- appendAll: copies each batch's positions into a scratch long[] once
  and bulk-accepts the whole batch.
- appendFiltered: packs the matching positions of each batch into the
  scratch buffer and bulk-accepts per matching run; a non-matching row
  drains the buffer and flushes the accumulator so a gap caused by
  another data file never coalesces.
- appendGroupedByPath: bulk-accepts each path run (already bounded by
  endOfPathRun) into the active per-path accumulator.

The breakRun() call sites switch to flush() (same semantics: emit the
active run, allow the next accept to start a new one). The scratch
buffer is grown lazily to the largest batch size seen and reused
across batches.
@Baunsgaard Baunsgaard force-pushed the arrow-position-delete-reader branch from a3d29ee to 1f54776 Compare May 20, 2026 11:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant