Skip to content

Add nested incremental file-scan pipeline#97

Merged
robertbuessow merged 25 commits intomainfrom
rb-split-scan-4
May 6, 2026
Merged

Add nested incremental file-scan pipeline#97
robertbuessow merged 25 commits intomainfrom
rb-split-scan-4

Conversation

@robertbuessow
Copy link
Copy Markdown
Contributor

@robertbuessow robertbuessow commented May 4, 2026

Summary

  • Extends the nested file-scan pipeline (stream of per-file streams) to incremental scans via scan_incremental_nested! — each item in the outer stream carries the filename, record count, and a per-file Arrow batch stream
  • Adds end-to-end and orchestration tests for the nested pipeline

Test plan

  • make run-containers then make test — full Julia test suite including nested scan tests
  • cargo test in iceberg_rust_ffi/ — Rust unit/integration tests including incremental pipeline tests

🤖 Generated with Claude Code
Overarching task tracking Iceberg-load optimizations: RAI-49519.

hall-alex and others added 17 commits April 6, 2026 16:33
Replace the to_arrow() code path with a custom file-parallel pipeline
that processes N parquet files concurrently while preserving strict
file-then-row ordering for Julia consumption.

Architecture:
- plan_files() provides an ordered list of FileScanTasks
- Each file task runs as a background tokio task with its own mpsc
  channel and per-file Semaphore(100MB) for backpressure
- FuturesOrdered yields per-file receivers in file order; the consumer
  drains file 0 batch-by-batch, then file 1, etc.
- Each file uses the same iceberg-rs ArrowReader code path as before

Changes:
- New: ordered_file_pipeline.rs (~420 lines incl. profiling stats)
- full.rs: IcebergScan gains file_io, batch_size, file_concurrency
  fields; iceberg_arrow_stream uses plan_files() + pipeline
- scan_common.rs / incremental.rs: pass through new fields in macros
- Temporary PipelineStats with FFI print_summary for benchmarking

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Exposes a new iceberg_file_scan_stream / scan_nested! API that yields one
IcebergFileScan per file, each carrying the filename, record count, and a
prefetched inner IcebergArrowStream of Arrow IPC batches. The existing flat
scan! path is kept by re-implementing it as a thin flattening wrapper over
the same nested pipeline, so all ordering, backpressure, and stats logic
lives in one place.

Key changes:
- ordered_file_pipeline: add FileScan (internal type), make_file_stream,
  spawn_file_task_with_meta, run_nested / create_nested_pipeline; rewrite
  create_pipeline to flatten the nested pipeline. Switch run_nested from
  FuturesOrdered to FuturesUnordered (spawn futures resolve immediately so
  ordering is irrelevant here).
- table.rs: add IcebergFileScan (#[repr(C)]), IcebergFileScanStream,
  IcebergFileScanResponse; add iceberg_file_scan_free,
  iceberg_file_scan_stream_free, iceberg_file_scan_record_count,
  iceberg_file_scan_filename (sync getters), iceberg_next_file_scan (async).
- full.rs: add iceberg_file_scan_stream export op; make IcebergScan.file_io
  non-optional (FileIO instead of Option<FileIO>) — always set at
  construction, use .clone() in stream ops instead of .take().
- Julia: scan_nested!, nested_arrow_stream, next_file_scan,
  file_scan_record_count, file_scan_filename, file_scan_arrow_stream,
  free_file_scan!, free_file_scan_stream! exported from RustyIceberg.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Replace all inline iceberg::Error::new(iceberg::ErrorKind::Unexpected, ...)
callsites with a crate-wide pub(crate) helper in lib.rs, removing the
boilerplate from catalog.rs, lib.rs, and ordered_file_pipeline.rs.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Add add_elapsed, store_elapsed, and async timed methods to PipelineStats
to eliminate the repeated start/record/elapsed boilerplate. The three async
phases (fetch+decode, serialize, semaphore) now read as single timed()
expressions; the sync reader-setup phase uses add_elapsed directly.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Tests (test/scan_tests.jl):
- Basic iteration: verify each FileScan has a non-empty .parquet filename,
  record_count > 0, and the nations table totals exactly 25 records.
- Row count consistency: nested and flat pipelines over the customer table
  must produce identical total row counts.
- Correct data: nested scan over nations produces the same rows as the
  existing flat-scan reference test.
- Builder methods: select_columns! and with_batch_size! are respected
  per-batch in the nested path.
- Safe early drop: abandon a FileScan mid-stream and drop the outer stream
  early; background file tasks see the dropped receiver and exit cleanly.

Also fixes the timed! macro call for the serialize phase: the block form
{ expr.await.map_err(...)? } was incorrectly evaluating to () before the
macro's own .await ran. The .map_err chains now appear outside the macro
invocation where they belong.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…ll time

PipelineStats unit tests (12 tests):
- reset_clears_every_field, task_start/end concurrency tracking, peak
  high-water mark, buffer add/release/peak, add_elapsed accumulation,
  store_elapsed overwrite, field isolation.

Dispatch backpressure metric:
- Rename dead consumer_wait_ns → file_dispatch_wait_ns. Track time
  run_nested spends blocked on the outer channel (slow consumer) using
  timed!(file_dispatch_wait_ns, tx.send(...)). Shown as "dispatch stall"
  in print_summary.

Nested pipeline wall time:
- run_nested now records pipeline_wall_ns when it finishes dispatching all
  FileScan handles. For the flat path run_flat overwrites it with the full
  end-to-end time; for the nested path this gives a non-zero throughput
  in print_summary instead of always showing 0.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Copilot <copilot@github.com>
Adds a new scan configuration parameter that controls how many completed
FileScan items are queued in the outer channel of the nested pipeline,
decoupling file-I/O concurrency from how far ahead Rust prefetches for
the Julia consumer.

Previously the outer channel capacity was hardcoded to `file_concurrency`,
coupling the two. Now they are independent: `file_concurrency` sets how
many files are processed in parallel, while `file_prefetch_depth` controls
how many ready FileScan items can buffer up before run_nested blocks.
When `file_prefetch_depth` is 0 (default), it falls back to `file_concurrency`
preserving existing behaviour.

Changes:
- IcebergScan/IcebergIncrementalScan: add file_prefetch_depth field
- scan_common.rs: propagate field through all struct-reconstructing macros;
  add impl_with_file_prefetch_depth! macro
- full.rs: wire iceberg_scan_with_file_prefetch_depth FFI function; resolve
  prefetch_depth in both iceberg_arrow_stream and iceberg_file_scan_stream
- ordered_file_pipeline: add prefetch_depth parameter to create_nested_pipeline
  and create_pipeline; use it for the outer FileScan channel capacity
- Julia: with_file_prefetch_depth! was already defined; move export to correct line

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- Extract resolve_pipeline_params() in full.rs to deduplicate the
  concurrency/prefetch_depth/file_io/batch_size resolution logic shared
  by iceberg_arrow_stream and iceberg_file_scan_stream sync blocks
- Update pipeline_stats.rs header comment to remove "temporary / will be
  removed" framing — the module is intended to stay

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Runs the Rust unit test suite on every PR and push to main, alongside the
existing format check. Uses rust-cache to keep incremental build times low.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Adds a per-file nested pipeline for incremental scans, mirroring the
full scan's scan_nested! / iceberg_file_scan_stream API. Returns a
(FileScanStream, ArrowStream) pair: one FileScan per appended parquet
file in manifest order, plus a flat delete stream.

Implementation:
- New incremental_pipeline.rs: reads each AppendedFileScanTask via the
  StreamsInto machinery (one-element stream + empty delete stream), with
  the same semaphore/backpressure pattern as the full scan pipeline.
  Delete stream uses StreamsInto with an empty append stream.
- ordered_file_pipeline.rs: extract generic run_nested_pipeline<T,S,F>
  that both pipelines share. Also promote BufferedBatch, MAX_FILE_CONCURRENCY,
  and make_file_stream to pub(crate) with an on_release callback so callers
  can opt in to STATS tracking. Full scan run_nested becomes a thin wrapper.
- incremental.rs: populate file_io from table at scan creation, add
  IcebergIncrementalFileScanStreamResponse, iceberg_incremental_file_scan_stream.
- Julia: IncrementalFileScanStreamResponse, nested_incremental_arrow_stream,
  scan_incremental_nested!. Reuses existing next_file_scan / file_scan_*
  helpers from the full scan without any new Julia machinery.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Adds 26 Rust unit tests across three modules:

ordered_file_pipeline (13 tests):
- Orchestration tests using spawn_empty (no I/O): empty input, single
  file, source ordering with concurrency=1, all files present with
  concurrency>N, stream errors at seeding and during drain, spawn
  errors, consumer dropping the receiver.
- End-to-end test (full_pipeline_reads_parquet_file): writes a 3-row
  parquet file to in-memory FileIO, runs create_nested_pipeline, and
  decodes the Arrow IPC output to verify [10,20,30]. Exercises reader
  setup, fetch/decode, spawn_blocking serialization, semaphore
  backpressure, and make_file_stream permit release.

incremental_pipeline (4 tests):
- concurrency_above_cap_returns_error: validates the hard cap guard.
- empty_streams_succeed: baseline, no I/O.
- append_stream_reads_parquet_file: full end-to-end append path through
  read_one_append_file (StreamsInto), serialization, and semaphore.
  Verifies filename, record count, and row values; delete side verified
  empty.
- delete_stream_yields_deleted_file_positions: DeletedFile variant with
  record_count=3 produces 3 (file_path, pos) rows via StreamsInto
  without reading any file. Append side verified empty.

Also adds bytes = "1" dev-dependency (needed for in-memory parquet
write in tests) and refactors linter-suggested cleanups in full.rs,
pipeline_stats.rs, and table.rs.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
# Conflicts:
#	iceberg_rust_ffi/src/incremental.rs
#	iceberg_rust_ffi/src/lib.rs
#	iceberg_rust_ffi/src/ordered_file_pipeline.rs
#	src/RustyIceberg.jl
#	src/full.jl
#	test/scan_tests.jl
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 4, 2026

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 93.33333% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 82.66%. Comparing base (cdce472) to head (769cc6e).
⚠️ Report is 6 commits behind head on main.

Files with missing lines Patch % Lines
src/incremental.jl 93.33% 1 Missing ⚠️
❗ Your organization needs to install the Codecov GitHub app to enable full functionality.
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #97      +/-   ##
==========================================
- Coverage   84.42%   82.66%   -1.77%     
==========================================
  Files           9        9              
  Lines         873     1015     +142     
==========================================
+ Hits          737      839     +102     
- Misses        136      176      +40     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Collaborator

@hall-alex hall-alex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only had a very quick look. Please ping if you need a detailed review. Slightly surprised that the change is relatively large 😬🤷‍♂️.

My general comment would be that it would be really nice to try and further unify the append and the full load case. Not at all sure how much is possible though. Just seems like there might be some boilerplate that can possibly be shared.

From the testing side: would it be easy to run all full-load tests also with the incremental pipeline and just set the start-snapshot-id to null?

Comment thread iceberg_rust_ffi/src/incremental.rs Outdated
);

/// Response for iceberg_incremental_file_scan_stream — one FileScanStream for
/// appends (per-file, ordered) and one flat ArrowStream for deletes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a bit more background here. A la "why is only one a FileScanStream". The deletes mean we always need to keep the ArrowStream API as well, right? Perhaps worth mentioning.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded — the comment now explains that deletes stay flat because position-delete records are already keyed by (file_path, pos) and don't benefit from per-file grouping.

Comment on lines +264 to +297
if scan.is_null() {
return Err(anyhow::anyhow!("Null scan pointer provided"));
}
let scan_ptr = unsafe { &*scan };
let scan_ref = &scan_ptr.scan;
if scan_ref.is_none() {
return Err(anyhow::anyhow!("Incremental scan not initialized"));
}
let file_io = scan_ptr
.file_io
.clone()
.ok_or_else(|| anyhow::anyhow!("FileIO not available; create scan from table"))?;

let concurrency = if scan_ptr.file_concurrency == 0 {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
} else {
scan_ptr.file_concurrency
};
let prefetch_depth = if scan_ptr.file_prefetch_depth == 0 {
concurrency
} else {
scan_ptr.file_prefetch_depth
};
let serialization_concurrency = if scan_ptr.serialization_concurrency == 0 {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
} else {
scan_ptr.serialization_concurrency
};
let batch_size = scan_ptr.batch_size;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some / a lot of this is probably boiler-plate which is identical to the full-load case. Let's extract a helper and use it here in the full-load case. Perhaps also for things like

           std::thread::available_parallelism()
                .map(|n| n.get())
                .unwrap_or(1)

extract little helpers. We probably have that in a bunch of places elsewhere...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added cpu_count() helper in lib.rs and resolve_incremental_pipeline_params in incremental.rs, replacing all three available_parallelism blocks in the nested-scan op.

Comment on lines +68 to +111
async fn process_incremental_file_inner(
task: AppendedFileScanTask,
reader: ArrowReader,
semaphore: &Arc<Semaphore>,
tx: &mpsc::Sender<Result<BufferedBatch, iceberg::Error>>,
) -> Result<(), iceberg::Error> {
let batch_stream =
read_one_append_file(reader, task).map_err(|e| unexpected(format!("reader setup: {e}")))?;
tokio::pin!(batch_stream);

loop {
let batch = match batch_stream.next().await {
Some(Ok(b)) => b,
Some(Err(e)) => return Err(e),
None => break,
};

let serialized = tokio::task::spawn_blocking(move || crate::serialize_record_batch(batch))
.await
.map_err(|e| unexpected(format!("serialize panicked: {e}")))?
.map_err(unexpected)?;

let byte_len = serialized.length;

let _permit = semaphore
.acquire_many(byte_len as u32)
.await
.map_err(|e| unexpected(format!("semaphore: {e}")))?;
std::mem::forget(_permit);

if tx
.send(Ok(BufferedBatch {
batch: serialized,
byte_len,
semaphore: semaphore.clone(),
}))
.await
.is_err()
{
return Ok(());
}
}
Ok(())
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this similar to the full-load case. If yes, let's extract a helper. Possibly also for other methods here. It would imo be extremely useful to see very quickly the exact (hopefully minimal) differences between append and full load.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialize + semaphore backpressure loop is the same pattern as ordered_file_pipeline. The only structural difference is how the batch stream is obtained (full scan: ArrowReaderBuilder directly; incremental: wraps the task in a one-element stream and calls StreamsInto). Extracting a shared helper for the serialize+backpressure step is a separate refactor — happy to do it as a follow-up if useful.

None => break,
};

let serialized = tokio::task::spawn_blocking(move || crate::serialize_record_batch(batch))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we reuse here the same thread pool that you created for the full scan?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — incremental_pipeline.rs now dispatches via ordered_file_pipeline::SERIALIZE_POOL (made pub(crate)) instead of spawn_blocking.

Exposes iceberg_incremental_scan_with_file_prefetch_depth via the
impl_with_file_prefetch_depth! macro in incremental.rs, adds the
Julia wrapper with docstring in incremental.jl, and covers it with a
test that exercises the nested scan path (scan_incremental_nested!)
to verify prefetch depth is accepted and data is returned correctly.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@robertbuessow robertbuessow requested review from gbrgr and hall-alex May 6, 2026 11:23
robertbuessow and others added 3 commits May 6, 2026 11:32
…eline_params, SERIALIZE_POOL

Extract cpu_count() into lib.rs to replace all inline available_parallelism()
calls across full.rs, incremental.rs, and ordered_file_pipeline.rs.

Add resolve_incremental_pipeline_params() in incremental.rs, mirroring
resolve_pipeline_params in full.rs, to deduplicate the three repeated
zero-default blocks in the nested-scan export_runtime_op.

Expand the IcebergIncrementalFileScanStreamResponse comment to explain
why appends use a FileScanStream (per-file ordering) while deletes stay
as a flat ArrowStream (position-delete records are keyed by file_path+pos
and don't benefit from per-file grouping).

Switch incremental_pipeline.rs from tokio::task::spawn_blocking to the
shared SERIALIZE_POOL (pub(crate)) from ordered_file_pipeline.rs, so both
full and incremental paths use the same process-global rayon pool.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Phases 2-4 (fetch/decode timing, rayon serialization, semaphore
backpressure, channel send) are now in a single pub(crate) function
drain_batch_stream in ordered_file_pipeline.rs. process_file_inner and
process_incremental_file_inner both delegate to it, removing ~43 lines
of duplication.

The incremental path now gets full timed!/STATS coverage — fetch_decode_ns,
serialize_ns, semaphore_wait_ns, batches_produced, bytes_produced, and
files_completed — matching the full-scan path.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
) -> Result<(), iceberg::Error> {
loop {
// ── Phase 2: Fetch + decode ─────────────────────────────────────
// Each .next() call fetches compressed parquet pages from storage,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of IMO useful comments were dropped. Can you re-add them?

I really don't know why Claude still just drops comments.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored the comments

Brings back the phase-level inline comments that were lost during the
drain_batch_stream extraction, and adds a comment to the incremental
auto-detect concurrency block.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@robertbuessow robertbuessow enabled auto-merge (squash) May 6, 2026 15:19
@robertbuessow robertbuessow merged commit f4425f4 into main May 6, 2026
4 checks passed
@robertbuessow robertbuessow deleted the rb-split-scan-4 branch May 6, 2026 15:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants