Skip to content

File-parallel pipeline with nested scan API and with_file_prefetch_depth!#92

Open
robertbuessow wants to merge 23 commits intomainfrom
rb-split-scan-3
Open

File-parallel pipeline with nested scan API and with_file_prefetch_depth!#92
robertbuessow wants to merge 23 commits intomainfrom
rb-split-scan-3

Conversation

@robertbuessow
Copy link
Copy Markdown
Contributor

@robertbuessow robertbuessow commented Apr 30, 2026

Summary

  • Replaces the iceberg-rs to_arrow() call with a custom file-parallel pipeline (ordered_file_pipeline.rs) that processes N files concurrently while yielding batches in strict file-then-row order
  • Adds a nested scan API (iceberg_file_scan_stream / scan_nested!) that exposes one FileScan per file — each carrying the filename, record count, and a prefetched inner batch stream — so Julia callers can handle per-file logic without reimplementing the pipeline
  • Adds with_file_prefetch_depth! to independently control how many completed FileScan items queue in the outer channel, decoupling prefetch buffering from file concurrency
  • Adds pipeline_stats for timing and throughput observability across pipeline phases (reader setup, fetch/decode, serialize, backpressure)
  • Adds cargo test to CI so the Rust unit test suite runs on every PR

Test plan

  • make run-containers then make test — full Julia test suite passes on Julia 1.10 and 1.11
  • cargo test in iceberg_rust_ffi/ passes
  • cargo fmt --check passes
  • Verify with_file_prefetch_depth!(scan, UInt(4)) sets the outer channel depth independently of with_data_file_concurrency_limit!
  • Verify scan_nested! yields one FileScan per file in order, with correct filename and record count

🤖 Generated with Claude Code
Overarching task tracking Iceberg-load optimizations: RAI-49519.

hall-alex and others added 19 commits April 6, 2026 16:33
Replace the to_arrow() code path with a custom file-parallel pipeline
that processes N parquet files concurrently while preserving strict
file-then-row ordering for Julia consumption.

Architecture:
- plan_files() provides an ordered list of FileScanTasks
- Each file task runs as a background tokio task with its own mpsc
  channel and per-file Semaphore(100MB) for backpressure
- FuturesOrdered yields per-file receivers in file order; the consumer
  drains file 0 batch-by-batch, then file 1, etc.
- Each file uses the same iceberg-rs ArrowReader code path as before

Changes:
- New: ordered_file_pipeline.rs (~420 lines incl. profiling stats)
- full.rs: IcebergScan gains file_io, batch_size, file_concurrency
  fields; iceberg_arrow_stream uses plan_files() + pipeline
- scan_common.rs / incremental.rs: pass through new fields in macros
- Temporary PipelineStats with FFI print_summary for benchmarking

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Exposes a new iceberg_file_scan_stream / scan_nested! API that yields one
IcebergFileScan per file, each carrying the filename, record count, and a
prefetched inner IcebergArrowStream of Arrow IPC batches. The existing flat
scan! path is kept by re-implementing it as a thin flattening wrapper over
the same nested pipeline, so all ordering, backpressure, and stats logic
lives in one place.

Key changes:
- ordered_file_pipeline: add FileScan (internal type), make_file_stream,
  spawn_file_task_with_meta, run_nested / create_nested_pipeline; rewrite
  create_pipeline to flatten the nested pipeline. Switch run_nested from
  FuturesOrdered to FuturesUnordered (spawn futures resolve immediately so
  ordering is irrelevant here).
- table.rs: add IcebergFileScan (#[repr(C)]), IcebergFileScanStream,
  IcebergFileScanResponse; add iceberg_file_scan_free,
  iceberg_file_scan_stream_free, iceberg_file_scan_record_count,
  iceberg_file_scan_filename (sync getters), iceberg_next_file_scan (async).
- full.rs: add iceberg_file_scan_stream export op; make IcebergScan.file_io
  non-optional (FileIO instead of Option<FileIO>) — always set at
  construction, use .clone() in stream ops instead of .take().
- Julia: scan_nested!, nested_arrow_stream, next_file_scan,
  file_scan_record_count, file_scan_filename, file_scan_arrow_stream,
  free_file_scan!, free_file_scan_stream! exported from RustyIceberg.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Replace all inline iceberg::Error::new(iceberg::ErrorKind::Unexpected, ...)
callsites with a crate-wide pub(crate) helper in lib.rs, removing the
boilerplate from catalog.rs, lib.rs, and ordered_file_pipeline.rs.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Add add_elapsed, store_elapsed, and async timed methods to PipelineStats
to eliminate the repeated start/record/elapsed boilerplate. The three async
phases (fetch+decode, serialize, semaphore) now read as single timed()
expressions; the sync reader-setup phase uses add_elapsed directly.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Tests (test/scan_tests.jl):
- Basic iteration: verify each FileScan has a non-empty .parquet filename,
  record_count > 0, and the nations table totals exactly 25 records.
- Row count consistency: nested and flat pipelines over the customer table
  must produce identical total row counts.
- Correct data: nested scan over nations produces the same rows as the
  existing flat-scan reference test.
- Builder methods: select_columns! and with_batch_size! are respected
  per-batch in the nested path.
- Safe early drop: abandon a FileScan mid-stream and drop the outer stream
  early; background file tasks see the dropped receiver and exit cleanly.

Also fixes the timed! macro call for the serialize phase: the block form
{ expr.await.map_err(...)? } was incorrectly evaluating to () before the
macro's own .await ran. The .map_err chains now appear outside the macro
invocation where they belong.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…ll time

PipelineStats unit tests (12 tests):
- reset_clears_every_field, task_start/end concurrency tracking, peak
  high-water mark, buffer add/release/peak, add_elapsed accumulation,
  store_elapsed overwrite, field isolation.

Dispatch backpressure metric:
- Rename dead consumer_wait_ns → file_dispatch_wait_ns. Track time
  run_nested spends blocked on the outer channel (slow consumer) using
  timed!(file_dispatch_wait_ns, tx.send(...)). Shown as "dispatch stall"
  in print_summary.

Nested pipeline wall time:
- run_nested now records pipeline_wall_ns when it finishes dispatching all
  FileScan handles. For the flat path run_flat overwrites it with the full
  end-to-end time; for the nested path this gives a non-zero throughput
  in print_summary instead of always showing 0.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Copilot <copilot@github.com>
Adds a new scan configuration parameter that controls how many completed
FileScan items are queued in the outer channel of the nested pipeline,
decoupling file-I/O concurrency from how far ahead Rust prefetches for
the Julia consumer.

Previously the outer channel capacity was hardcoded to `file_concurrency`,
coupling the two. Now they are independent: `file_concurrency` sets how
many files are processed in parallel, while `file_prefetch_depth` controls
how many ready FileScan items can buffer up before run_nested blocks.
When `file_prefetch_depth` is 0 (default), it falls back to `file_concurrency`
preserving existing behaviour.

Changes:
- IcebergScan/IcebergIncrementalScan: add file_prefetch_depth field
- scan_common.rs: propagate field through all struct-reconstructing macros;
  add impl_with_file_prefetch_depth! macro
- full.rs: wire iceberg_scan_with_file_prefetch_depth FFI function; resolve
  prefetch_depth in both iceberg_arrow_stream and iceberg_file_scan_stream
- ordered_file_pipeline: add prefetch_depth parameter to create_nested_pipeline
  and create_pipeline; use it for the outer FileScan channel capacity
- Julia: with_file_prefetch_depth! was already defined; move export to correct line

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- Extract resolve_pipeline_params() in full.rs to deduplicate the
  concurrency/prefetch_depth/file_io/batch_size resolution logic shared
  by iceberg_arrow_stream and iceberg_file_scan_stream sync blocks
- Update pipeline_stats.rs header comment to remove "temporary / will be
  removed" framing — the module is intended to stay

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Runs the Rust unit test suite on every PR and push to main, alongside the
existing format check. Uses rust-cache to keep incremental build times low.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
## Summary

- **Nested pipeline API**: `scan_nested!` / `iceberg_file_scan_stream`
returns an outer `FileScanStream` that yields one `IcebergFileScan` per
file. Each scan carries the filename, record count, and a prefetched
inner `IcebergArrowStream` of Arrow IPC batches. The existing flat
`scan!` is re-implemented as a thin flattening wrapper over the same
nested pipeline so all ordering, backpressure, and stats logic lives in
one place.
- **Backwards compatible**: all existing `scan!` call sites and tests
are unchanged.
- **`IcebergScan.file_io` made non-optional**: was always set at
construction; removing `Option<>` enforces this statically and
eliminates dead `.take().ok_or_else(...)` guards in both stream ops.
- **`unexpected()` crate-wide helper**: replaces all inline
`iceberg::Error::new(iceberg::ErrorKind::Unexpected, ...)` in
`catalog.rs`, `lib.rs`, and `ordered_file_pipeline.rs`.
- **`timed!` macro**: replaces the `let start / .await / add_elapsed`
pattern for the three async phases (fetch+decode, serialize, semaphore).
Block form `timed!(field, { expr })` keeps the important computation
visually prominent.
- **`PipelineStats` improvements**: `add_elapsed` / `store_elapsed`
helpers; `file_dispatch_wait_ns` metric tracks outer-channel
backpressure (time `run_nested` is blocked waiting for the consumer to
call `next_file_scan`); `pipeline_wall_ns` is now also set for the
nested path (previously always 0, breaking the throughput display).
- **12 Rust unit tests** for all `PipelineStats` methods.
- **Julia tests**: 6 new `@testset` blocks covering basic iteration,
`nested_arrow_stream` called directly, row-count consistency vs flat
scan, correct data, builder-method compatibility, and safe early drop.

## New Julia API

```julia
# Returns a FileScanStream — one FileScan per file
stream = scan_nested!(scan)                  # or nested_arrow_stream(scan) after build!

fs_ptr = next_file_scan(stream)              # Ptr{Cvoid}, C_NULL when done
while fs_ptr != C_NULL
    fname  = file_scan_filename(fs_ptr)      # String
    nrec   = file_scan_record_count(fs_ptr)  # Int64 (before deletes)
    inner  = file_scan_arrow_stream(fs_ptr)  # borrowed ArrowStream — do NOT free_stream
    batch  = next_batch(inner)
    while batch != C_NULL
        # ... process Arrow IPC batch ...
        free_batch(batch)
        batch = next_batch(inner)
    end
    free_file_scan!(fs_ptr)                  # frees filename + inner stream
    fs_ptr = next_file_scan(stream)
end
free_file_scan_stream!(stream)
```

## Ownership rules

- `file_scan_arrow_stream` returns a **borrowed** pointer into the
`IcebergFileScan` struct. Callers must **not** call `free_stream` on it
— `free_file_scan!` handles cleanup.
- Dropping a `FileScan` without reading all inner batches is safe: the
dropped receiver causes the background file task to exit cleanly.
- Dropping the `FileScanStream` without consuming all files is equally
safe.

## Test plan

- [ ] `make test` passes (all existing tests unchanged)
- [ ] New nested-API tests pass (6 testsets)
- [ ] `cargo test` passes (12 `PipelineStats` unit tests)

Based on @hall-alex sorted iterator change.
Overarching task tracking Iceberg-load optimizations:
[RAI-49519](https://relationalai.atlassian.net/browse/RAI-49519).

🤖 Generated with [Claude Code](https://claude.ai/claude-code)

[RAI-49519]:
https://relationalai.atlassian.net/browse/RAI-49519?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

---------

Co-authored-by: hall-alex <alex.hall@firebolt.io>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Copilot <copilot@github.com>
## Export improvements

This PR reworks the Parquet/Iceberg write path for throughput,
cleanliness, and configurability.

**New write APIs**
- `WriterConfig` struct with configurable Parquet properties:
compression codec, dictionary encoding, plain encoding, row group size,
page size, write batch size, and statistics
- `ColumnBatch` / `write_columns` — zero-copy write path for flat column
buffers (bypasses Arrow IPC serialization)
- `GatheredBatch` / `write_columns` — gathered-column write path that
assembles columns from scattered slices (selection vectors + validity
bitmaps) directly in the calling thread
- `set_encode_workers!` to configure the encode thread pool size before
first use

**Global encode worker pool**
A single pool of N OS threads (default: `Sys.CPU_THREADS`) is shared
across all writers. Each `write_columns` / `write` call gathers or
serializes data in the calling thread, submits a `RecordBatch` to the
pool, and returns immediately — encode and Parquet I/O run on pool
threads. Per-writer ordering is preserved via a
`Mutex<ConcreteDataFileWriter>` in the shared `WriterState`: only one
pool thread encodes a given writer at a time. `close_writer` waits for
all in-flight tasks to drain before finalising the file.

This design lets Julia pipeline the gather/serialize step on the main
thread with encode work happening concurrently on pool threads, rather
than blocking end-to-end on each write.

**Tests**
Added tests for `ColumnBatch`, `GatheredBatch` (including scattered
slices, nullable columns, string columns), decimal types
(Int32/Int64/Int128 backing), and all `WriterConfig` Parquet properties.

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1. Remove the MAX_FILE_CONCURRENCY = 16 constant and the guard in
create_nested_pipeline that rejected concurrency values above 16. On
machines with more than 16 cores this caused create_nested_pipeline to
bail with an error even for reasonable concurrency values, breaking
initialization.
2. Memory is already bounded per-file by the
Semaphore(MAX_BUFFERED_BYTES_PER_TASK) backpressure mechanism — the hard
cap is redundant.
3. Bump version to 0.7.20.
Runs the Rust unit test suite on every PR and push to main, alongside the
existing format check. Uses rust-cache to keep incremental build times low.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Adds a .pre-commit-config.yaml with a local hook that runs
`cargo metadata --locked` whenever Cargo.toml is staged, failing the
commit if the lock file is out of sync with the manifest.

Adds a corresponding `cargo-lock-check` job to CI.yml so the same
check runs on every PR/push without requiring pre-commit to be
installed on CI runners.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants