Skip to content

feat: introduce pluggable SpillFile trait and TempFileFactory for custom spill backends#21882

Open
pantShrey wants to merge 2 commits into
apache:mainfrom
pantShrey:abstract-spill-file
Open

feat: introduce pluggable SpillFile trait and TempFileFactory for custom spill backends#21882
pantShrey wants to merge 2 commits into
apache:mainfrom
pantShrey:abstract-spill-file

Conversation

@pantShrey

@pantShrey pantShrey commented Apr 27, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

DataFusion’s spill infrastructure is tightly coupled to OS-level files, with no extension points for alternative storage backends. DiskManager cannot be customized for file creation, and IPCStreamWriter depends on OS file paths.
This prevents integration in environments where temporary storage must be managed by the host system. For example, Postgres extensions (e.g., ParadeDB) require spill files to go through BufFile APIs to respect temp_tablespaces, enforce temp_file_limit, and integrate with transaction-scoped cleanup. Since BufFile has no OS-visible path, it cannot work with the current design.
A secondary motivation raised by @alamb is supporting object storage backends (S3, GCS) for spilling, which require async IO and cannot use std::io::Write or std::io::Read.

What changes are included in this PR?

  • Introduced SpillFile, SpillWriter, and TempFileFactory traits to abstract spill file handling
  • Added DiskManagerMode::Custom to allow pluggable backends
  • Updated DiskManager to return Arc<dyn SpillFile> instead of OS-bound types
  • Refactored write path using SpillWriteAdapter to bridge sync Arrow writers with backend-agnostic writers
  • Refactored read path to use async streaming (Stream<Item = Result<Bytes>>) instead of blocking state machines
  • Updated spill-related components to operate on Arc<dyn SpillFile>
  • Migrated the Sort-Merge Join (SMJ) operator to use the async spill abstraction

Are these changes tested?

Yes. Existing spill tests cover the full read/write flow.

  • Fixed test_disk_usage_decreases_as_files_consumed by correcting a pre-existing off-by-one assumption in file rotation
  • Fixed test_preserve_order_with_spilling by just asserting spilling occurs (spill_count>0) and output batches are sorted

Are there any user-facing changes?

Yes this introduces API changes:

  • Spill-related APIs now use Arc<dyn SpillFile> instead of RefCountedTempFile
  • New public traits: SpillFile, SpillWriter, TempFileFactory
  • Added DiskManagerMode::Custom for custom backends

Custom spill backends can now be implemented and plugged in via DiskManager.

@github-actions github-actions Bot added execution Related to the execution crate physical-plan Changes to the physical-plan crate labels Apr 27, 2026
@pantShrey

pantShrey commented Apr 27, 2026

Copy link
Copy Markdown
Contributor Author

@alamb I opened this draft PR to get early feedback on the architecture.

  1. The first point is around the sync read path. I introduced open_sync_reader because SortMergeJoin currently has synchronous, blocking code paths that directly open files using paths and BufReader, instead of going through the spill abstractions. Converting this to fully async would significantly increase the scope of this PR.

    • Does it make sense to keep this escape hatch for now and handle making these operators async in a follow-up PR?
  2. The second point is regarding test failures. I have not modified the original 64B limit in the tests because I wanted guidance here. Currently, the repartition test in mod.rs is failing, and it seems related to spilling not being triggered correctly, the new SpillWriteAdapter adds slight allocation overhead which makes the original 64-byte memory limit too tight for the merge heap to initialize (~296 bytes needed), bumping up the memory limit causes the test to not spill anymore, I believe increasing the test data size might solve the issue, but am not sure.

I might be missing something here, so would really appreciate your guidance.

@alamb

alamb commented May 7, 2026

Copy link
Copy Markdown
Contributor

Thanks -- will try and look at this shortly

@alamb

alamb commented May 9, 2026

Copy link
Copy Markdown
Contributor

@alamb I opened this draft PR to get early feedback on the architecture.

  1. The first point is around the sync read path. I introduced open_sync_reader because SortMergeJoin currently has synchronous, blocking code paths that directly open files using paths and BufReader, instead of going through the spill abstractions. Converting this to fully async would significantly increase the scope of this PR.

    • Does it make sense to keep this escape hatch for now and handle making these operators async in a follow-up PR?

Kind of, though it seems like accumulating technical debt as we'll have APIs that will not be needed once we complete the work for SortMergeJoin

What do you think about making a first PR to migrate SortMergeJoin to use the spill abstraction?

  1. The second point is regarding test failures. I have not modified the original 64B limit in the tests because I wanted guidance here. Currently, the repartition test in mod.rs is failing, and it seems related to spilling not being triggered correctly, the new SpillWriteAdapter adds slight allocation overhead which makes the original 64-byte memory limit too tight for the merge heap to initialize (~296 bytes needed), bumping up the memory limit causes the test to not spill anymore, I believe increasing the test data size might solve the issue, but am not sure.

Makes sense to me

@alamb alamb left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pantShrey - I reviewed this and the basic idea looks good to me. I do think it would be nice to have a unified (async) IO abstraction rather than leaving some hook around for sync IO and making this API more complicated

used_disk_space: Arc<AtomicU64>,
/// Number of active temporary files created by this disk manager
active_files_count: Arc<AtomicUsize>,
/// Custom Backend

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small nit: I think "custom" is a somewhat unecessary term here . Perhaps this

    factory: Option<Arc<dyn TempFileFactory>>,

or

    temp_file_factory: Option<Arc<dyn TempFileFactory>>,

would be more consistent with the rest of the codebase


#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub enum DiskManagerMode {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to permit a TempFileFactory here, I wonder if it would make sense (maybe as a follow on PR) here to move everything over

So like

pub enum DiskManagerMode {
   Custom(Arc<dyn TempFileFactory>),
}

And then implement a basic DirectoriesTempFileFactor and a NoTempFilesFactory that are provided along with datafusion

That might simplify the code, and it would also ensure the TempFileFactory API is rich enough for the existing temp file strategies

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this is not a super useful thing to do at the moment

.collect()
}

pub struct OsSpillWriter {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "file spill writer"?

/// Writer for spill file backends.
/// Receives zero-copy `Bytes` payloads from the IPCStreamWriter adapter.
pub trait SpillWriter: Send {
fn write(&mut self, data: Bytes) -> Result<()>;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty similar to https://doc.rust-lang.org/std/io/trait.Write.html 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. The reason I didn't use Write trait which uses &[u8] was for ownership reasons. Some backends might queue chunks to a background task (e.g., S3 multipart via a channel) and need to hold the data past the write() call's return. &[u8] can't express that, and it would force a second copy between the SpillWriteAdapter and the SpillWriter.
Also, the custom SpillWriter trait contains finish(), which maps perfectly to complete_multipart_upload for S3 and resource owner cleanup for Postgres.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all true -- however, I think that since the underlying IPC writer takes a std::io::Write, forcing all backends to use Bytes will likely require an extra unecessary copy (see comments below on SpillWriterAdapter) anways.

If you use a std::io::write like interface here, backends that want to queue chunks can do so (by copying into Bytes buffers themselves)

Thus what i suggest is:

  1. Change this to look more like std::io::wrote:
    fn write(&mut self, data: &[u8]) -> Result<()>;

Which will allow you to get rid of the write adapter

@pantShrey

Copy link
Copy Markdown
Contributor Author

@alamb Thank you so much for the review! I scoped out the SortMergeJoin migration today, specifically looking at bitwise_stream.rs and process_key_match_with_filter, to see what it would take.

Because SortMergeJoin currently reads from the spill file via a synchronous for loop inside a hand-rolled poll state machine, making the read path truly async requires a major rewrite. We can't just .await the stream, so we may need to store the SendableRecordBatchStream in the execution state and manually persist variables like matched_count across Poll::Pending yields.

Because ParadeDB is hoping to unblock their Postgres integration next week, I'm worried a state machine rewrite of this scale will stall them.

Would you be open to merging this core abstraction first (with open_sync_reader marked as #[deprecated])? I can open a dedicated tracking issue for the SortMergeJoin async migration and tackle it as a fast follow-up PR.

I am happy to defer to your judgment if you feel the tech debt must be addressed first!

@alamb

alamb commented May 12, 2026

Copy link
Copy Markdown
Contributor

I am happy to defer to your judgment if you feel the tech debt must be addressed first!

How about we try it in parallel?

@pantShrey

pantShrey commented May 12, 2026

Copy link
Copy Markdown
Contributor Author

I am happy to defer to your judgment if you feel the tech debt must be addressed first!

How about we try it in parallel?

@alamb sure, i have already started to work on that locally while waiting for the response

also i am actually still stuck on the test repartition::test::test_preserve_order_with_spilling

The issue stems from the fact that RepartitionMerge now requires more memory than a RepartitionExec node, this greedily allocates memory to RepartitionExec which could have spilled instead of RepartitionMerge which cannot spill.

I would really appreciate any guidance on this, am I missing something obvious here?

@alamb

alamb commented May 12, 2026

Copy link
Copy Markdown
Contributor

test_preserve_order_with_spilling

Sadly I am not familar with this test so I don't have a lot to offer you

Maybe you can look at git history and see who introduced the test and maybe they might have some ideas

@pantShrey

Copy link
Copy Markdown
Contributor Author

Hey @adriangb, Andrew suggested I reach out to you since you originally authored repartition::test::test_preserve_order_with_spilling. I'm currently hitting a wall with it while migrating the spilling architecture to async streams.

The test is currently stuck in a memory-accounting deadlock. Here’s what is happening:

  • If I set the memory pool limit tight enough to force a spill, RepartitionMerge panics during initialization. It needs to reserve some memory to set up its streams, but exhausts the pool before completing its unspillable setup.

  • However, if I increase the pool limit to give Merge enough headroom to initialize safely and then scale up the data volume to force overflow, the RepartitionExec producers greedily consume the additional memory first. This either ends up starving Merge again or allows the query to complete entirely in memory without triggering a spill.

I was able to trigger a spill once by setting the test memory limit to 608 B, but even that was not sufficient for the test to pass reliably.

Is there a correct or idiomatic way to configure this test (batch sizes, data volume, memory pool limits, etc.) to reliably force a RepartitionExec spill without violating the Merge operator’s baseline initialization overhead? Or am I approaching this incorrectly and missing something obvious?

I would really appreciate any guidance you could provide.

@adriangb

Copy link
Copy Markdown
Contributor

IIRC that test was added when we added spilling to RepartitionExec. Conceptually the test is simple: if RepartitionExec is configured to preserve order and it spills we need to make sure that spilling did not shuffle the data. The orchestration however is difficult: forcing a RepartitionExec to spill usually requires skewed upstream partition consumption rates. You could try to change the test to eg use a GroupBy or maybe we can use a RepartitionExec in isolation if we pull from the streams in the right way. I think the structure can be changed quite a bit as long as we preserve the semantic meaning of the test, I am not surprised that it is pretty fragile to changes.

@pantShrey pantShrey force-pushed the abstract-spill-file branch from 2971e41 to de6697f Compare May 13, 2026 12:56
@pantShrey

Copy link
Copy Markdown
Contributor Author

@alamb I’ve addressed the nits and force-pushed the updates. Could you please trigger the CI and take another look when you have a moment? In the meantime, I am working on migrating SortMergeJoin to the new spill abstractions in parallel so that both can be reviewed quickly. Thank you again for your time!

@pantShrey

Copy link
Copy Markdown
Contributor Author

@adriangb Thank you so much for the guidance! I updated the test to simply assert that a spill does occur
(spill_count > 0) and that the batch output order remains perfectly sorted, rather than trying to force every single batch to spill. I hope this aligns with the semantic purpose you originally envisioned for the test. I really appreciate your help getting me unstuck here!

@adriangb

Copy link
Copy Markdown
Contributor

@adriangb Thank you so much for the guidance! I updated the test to simply assert that a spill does occur (spill_count > 0) and that the batch output order remains perfectly sorted, rather than trying to force every single batch to spill. I hope this aligns with the semantic purpose you originally envisioned for the test. I really appreciate your help getting me unstuck here!

That makes sense to me.

@github-actions

github-actions Bot commented May 13, 2026

Copy link
Copy Markdown

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
error: `cargo metadata` exited with an error:     Updating crates.io index
error: failed to get `windows` as a dependency of package `sysinfo v0.39.3`
    ... which satisfies dependency `sysinfo = "^0.39.3"` (locked to 0.39.3) of package `datafusion v54.0.0 (/home/runner/work/datafusion/datafusion/datafusion/core)`

Caused by:
  failed to load source for dependency `windows`

Caused by:
  unable to update registry `crates-io`

Caused by:
  download of wi/nd/windows failed

Caused by:
  curl failed

Caused by:
  [55] Failed sending data to the peer (OpenSSL SSL_read: SSL_ERROR_SYSCALL, errno 0)

@github-actions github-actions Bot added the auto detected api change Auto detected API change label May 13, 2026
@pantShrey

Copy link
Copy Markdown
Contributor Author

cargo-semver-checks flagged DiskManagerMode::Custom as a breaking change since the enum isn't
#[non_exhaustive]. Happy to add it if preferred, but wanted to check first since it would affect downstream users matching on this enum.

@pantShrey pantShrey force-pushed the abstract-spill-file branch from e31bff4 to 086632a Compare May 14, 2026 19:38
@pantShrey

Copy link
Copy Markdown
Contributor Author

Hey @alamb, quick update! While working on the SortMergeJoin async migration in parallel, I realised the changes were actually quite contained (~260 insertions, ~170 deletions). Rather than opening a second stacked PR and temporarily introducing the open_sync tech debt to main, I went ahead and rolled the refactor directly into this PR to keep things clean. I hope this approach is okay with you!

I believe the PR is now ready for review, so I've marked it as such. I'd appreciate another look whenever you have the time. Thank you!

@pantShrey pantShrey marked this pull request as ready for review May 14, 2026 20:08
@alamb

alamb commented May 15, 2026

Copy link
Copy Markdown
Contributor

Rather than opening a second stacked PR and temporarily introducing the open_sync tech debt to main, I went ahead and rolled the refactor directly into this PR to keep things clean. I hope this approach is okay with you!

Grerat-- can you please make a PR for just the SMJ refactor and then stack this PR on it?

@alamb

alamb commented May 15, 2026

Copy link
Copy Markdown
Contributor

That will make it easier / faster to review (I am not a SMJ expert so I can't really review that part effiicently)

@pantShrey

Copy link
Copy Markdown
Contributor Author

Hey @alamb, quick update!

I've reworked both PRs to make them easier to review independently:

  • refactor: Update SortMergeJoin to use async spill abstractions #22230 (SMJ refactor) no longer depends on this PR, it now migrates SortMergeJoin to async spill abstractions while keeping the concrete RefCountedTempFile type, so it can be reviewed and merged standalone.
  • This PR now only does one focused thing: introduces the SpillFile trait + TempFileFactory and swaps the internal type from RefCountedTempFileArc<dyn SpillFile> and internal changes in the spill module. I've also removed the open_sync tech debt that was here before, and have added the skip validation in streamdecoder

The plan is for #22230 to merge first, then I'll rebase this on top of it. Would be grateful if you could take a look whenever you get the chance!

@pantShrey pantShrey marked this pull request as ready for review May 19, 2026 14:17
@alamb alamb added the review:waiting Ready for an initial review by a committer label May 20, 2026
pull Bot pushed a commit to buraksenn/datafusion that referenced this pull request Jun 16, 2026
…e#22230)

~~## Note: This PR depends on apache#21882 (pluggable SpillFile trait) and
cannot be merged before it. Opening in parallel per @alamb's suggestion
for easier review. The required SpillFile trait used here is defined in
that base PR.To review locally, apply apache#21882 first and then stack this
branch on top.~~
**Update:** This PR has been rebased to use the existing
`RefCountedTempFile` and is now completely standalone. It can be
reviewed and merged independently

## Which issue does this PR close?

- Contributes to apache#21215 (and is required by apache#21882)

## Rationale for this change

`materializing_stream.rs` and `bitwise_stream.rs` were reading spilled
batches via `open_sync_reader` / direct `File::open` calls ~~, bypassing
the `SpillFile` abstraction introduced in apache#21882~~. This PR migrates
both to use `SpillManager::read_spill_as_stream`. This safely converts
the SMJ to an async I/O path, preparing the ground for custom backends
(Postgres BufFile, object storage) to handle spill reads without
requiring an OS file path.

## What changes are included in this PR?

- `materializing_stream.rs`: Eagerly restores spilled `BufferedBatches`
via async streams before freezing, avoiding new state machine variants.
- `bitwise_stream.rs`: Replaces sync reads with an async
`poll_next_unpin` loop, caching the stream to survive `Poll::Pending`.
~~- `spill_file.rs`: Removes `open_sync_reader` from the `SpillFile`
trait (no longer needed).~~

## Are these changes tested?

Covered by existing SMJ tests. No new tests added, the behavioral change
is internal (sync → async IO path).

## Are there any user-facing changes?

No.
~~Removes `open_sync_reader` from the SpillFile trait, this is a
breaking API change for anyone implementing the trait, but the trait was
introduced in apache#21882 which has not merged yet so there are no external
implementors.~~

---------

Co-authored-by: Kumar Ujjawal <ujjawalpathak6@gmail.com>
@pantShrey pantShrey force-pushed the abstract-spill-file branch from 6954b55 to d71a415 Compare June 16, 2026 19:18
@pantShrey

Copy link
Copy Markdown
Contributor Author

@alamb #22230 Has been merged! I've also rebased this PR on top of latest main, so it is ready for your review whenever you have the time.

@alamb

alamb commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

run benchmark spill_io

@adriangbot

Copy link
Copy Markdown

🤖 Benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4745820494-593-2hs7c 6.12.68+ #1 SMP Sat May 2 07:49:07 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing abstract-spill-file (d71a415) to 6176a6d (merge-base) diff using: spill_io
Results will be posted here when complete


File an issue against this benchmark runner

@alamb

alamb commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

run benchmark external_aggr

@alamb

alamb commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

run benchmark smj

@adriangbot

Copy link
Copy Markdown

🤖 Benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4745858955-595-vw4pm 6.12.68+ #1 SMP Sat May 2 07:49:07 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing abstract-spill-file (d71a415) to 6176a6d (merge-base) diff using: smj
Results will be posted here when complete


File an issue against this benchmark runner

@adriangbot

Copy link
Copy Markdown

🤖 Benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4745858585-594-ltzkc 6.12.68+ #1 SMP Sat May 2 07:49:07 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing abstract-spill-file (d71a415) to 6176a6d (merge-base) diff using: external_aggr
Results will be posted here when complete


File an issue against this benchmark runner

@adriangbot

Copy link
Copy Markdown

🤖 Benchmark completed (GKE) | trigger

Instance: c4a-highmem-16 (12 vCPU / 65 GiB)

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected
Details

group                                  HEAD                                   abstract-spill-file
-----                                  ----                                   -------------------
spill_compression/q16/lz4_frame        1.00     34.7±8.23ms        ? ?/sec    1.69     58.5±0.95ms        ? ?/sec
spill_compression/q16/uncompressed     1.00     36.0±4.52ms        ? ?/sec    2.33     83.9±1.97ms        ? ?/sec
spill_compression/q16/zstd             1.00     63.3±2.66ms        ? ?/sec    1.23     78.1±1.11ms        ? ?/sec
spill_compression/q2/lz4_frame         1.00     19.2±4.03ms        ? ?/sec    1.42     27.2±0.50ms        ? ?/sec
spill_compression/q2/uncompressed      1.00     18.2±4.51ms        ? ?/sec    2.11     38.5±1.14ms        ? ?/sec
spill_compression/q2/zstd              1.00    32.0±10.07ms        ? ?/sec    1.05     33.7±0.55ms        ? ?/sec
spill_compression/q20/lz4_frame        1.00     25.8±3.47ms        ? ?/sec    1.51     39.0±0.53ms        ? ?/sec
spill_compression/q20/uncompressed     1.00     23.7±4.39ms        ? ?/sec    2.19     51.8±1.06ms        ? ?/sec
spill_compression/q20/zstd             1.00     46.5±1.73ms        ? ?/sec    1.13     52.6±0.55ms        ? ?/sec
spill_compression/wide/lz4_frame       1.00     89.9±5.61ms        ? ?/sec    1.69    151.9±2.76ms        ? ?/sec
spill_compression/wide/uncompressed    1.00    100.1±7.76ms        ? ?/sec    2.36    236.5±4.43ms        ? ?/sec
spill_compression/wide/zstd            1.00    160.8±4.77ms        ? ?/sec    1.26    202.4±3.04ms        ? ?/sec
spill_io/StreamReader/read_100/        1.00     50.7±9.84ms        ? ?/sec    3.11    157.9±2.85ms        ? ?/sec

Resource Usage

spill_io — base (merge-base)

Metric Value
Wall time 375.1s
Peak memory 203.8 MiB
Avg memory 47.6 MiB
CPU user 121.5s
CPU sys 41.5s
Peak spill 0 B

spill_io — branch

Metric Value
Wall time 395.1s
Peak memory 294.5 MiB
Avg memory 77.8 MiB
CPU user 170.3s
CPU sys 81.9s
Peak spill 0 B

File an issue against this benchmark runner

@adriangbot

Copy link
Copy Markdown

🤖 Benchmark completed (GKE) | trigger

Instance: c4a-highmem-16 (12 vCPU / 65 GiB)

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected
Details

Comparing HEAD and abstract-spill-file
--------------------
Benchmark smj.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query     ┃                                  HEAD ┃                  abstract-spill-file ┃        Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1  │           8.52 / 9.04 ±0.35 / 9.44 ms │          8.45 / 8.73 ±0.25 / 9.17 ms │     no change │
│ QQuery 2  │     170.82 / 173.15 ±1.56 / 175.20 ms │    171.88 / 175.55 ±3.78 / 182.56 ms │     no change │
│ QQuery 3  │     106.61 / 108.88 ±2.56 / 113.80 ms │    105.91 / 108.46 ±1.69 / 110.44 ms │     no change │
│ QQuery 4  │        27.97 / 28.23 ±0.33 / 28.88 ms │       28.22 / 28.43 ±0.21 / 28.83 ms │     no change │
│ QQuery 5  │        21.59 / 21.81 ±0.17 / 22.02 ms │       21.45 / 21.84 ±0.31 / 22.29 ms │     no change │
│ QQuery 6  │     171.61 / 177.06 ±3.72 / 182.16 ms │    167.35 / 170.44 ±1.70 / 172.23 ms │     no change │
│ QQuery 7  │     208.67 / 213.11 ±4.00 / 220.22 ms │    207.53 / 212.78 ±4.55 / 220.96 ms │     no change │
│ QQuery 8  │        20.05 / 20.51 ±0.36 / 21.16 ms │       20.40 / 21.66 ±1.80 / 25.24 ms │  1.06x slower │
│ QQuery 9  │    217.50 / 225.69 ±10.71 / 246.82 ms │    215.03 / 218.85 ±5.17 / 229.06 ms │     no change │
│ QQuery 10 │        71.33 / 77.74 ±9.49 / 96.50 ms │       70.97 / 78.85 ±6.84 / 91.59 ms │     no change │
│ QQuery 11 │        27.42 / 27.63 ±0.16 / 27.83 ms │       27.08 / 27.29 ±0.22 / 27.64 ms │     no change │
│ QQuery 12 │        65.77 / 70.60 ±4.12 / 75.71 ms │       67.77 / 72.26 ±2.37 / 74.50 ms │     no change │
│ QQuery 13 │    103.14 / 113.35 ±12.17 / 136.14 ms │     97.98 / 105.90 ±4.19 / 109.89 ms │ +1.07x faster │
│ QQuery 14 │        70.06 / 71.23 ±0.93 / 72.82 ms │       68.64 / 70.87 ±1.79 / 74.03 ms │     no change │
│ QQuery 15 │        69.28 / 73.78 ±4.36 / 81.31 ms │       69.94 / 72.40 ±3.04 / 78.39 ms │     no change │
│ QQuery 16 │        13.06 / 13.58 ±0.29 / 13.87 ms │       13.26 / 16.54 ±6.10 / 28.73 ms │  1.22x slower │
│ QQuery 17 │     147.35 / 148.70 ±1.38 / 151.26 ms │    148.41 / 149.59 ±0.92 / 150.99 ms │     no change │
│ QQuery 18 │     110.03 / 114.73 ±7.20 / 129.06 ms │    110.20 / 113.80 ±5.13 / 123.95 ms │     no change │
│ QQuery 19 │    387.67 / 533.48 ±74.02 / 588.86 ms │   382.41 / 496.04 ±87.96 / 574.21 ms │ +1.08x faster │
│ QQuery 20 │ 1284.37 / 1298.81 ±11.69 / 1313.36 ms │ 1276.43 / 1280.96 ±3.22 / 1283.84 ms │     no change │
│ QQuery 21 │      97.58 / 101.40 ±5.76 / 112.80 ms │       96.56 / 97.58 ±0.71 / 98.67 ms │     no change │
│ QQuery 22 │     102.40 / 105.28 ±2.19 / 107.65 ms │    104.18 / 108.00 ±5.16 / 118.18 ms │     no change │
│ QQuery 23 │     109.71 / 113.36 ±1.89 / 114.75 ms │    107.97 / 111.90 ±2.50 / 114.61 ms │     no change │
│ QQuery 24 │        26.94 / 27.29 ±0.26 / 27.75 ms │       26.88 / 28.00 ±1.67 / 31.33 ms │     no change │
│ QQuery 25 │        71.75 / 73.08 ±1.27 / 74.99 ms │       70.71 / 73.37 ±2.08 / 77.01 ms │     no change │
│ QQuery 26 │     103.36 / 108.32 ±3.26 / 113.50 ms │    102.54 / 108.40 ±3.17 / 111.53 ms │     no change │
└───────────┴───────────────────────────────────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                  ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)                  │ 4049.87ms │
│ Total Time (abstract-spill-file)   │ 3978.49ms │
│ Average Time (HEAD)                │  155.76ms │
│ Average Time (abstract-spill-file) │  153.02ms │
│ Queries Faster                     │         2 │
│ Queries Slower                     │         2 │
│ Queries with No Change             │        22 │
│ Queries with Failure               │         0 │
└────────────────────────────────────┴───────────┘

Resource Usage

smj — base (merge-base)

Metric Value
Wall time 25.0s
Peak memory 674.5 MiB
Avg memory 283.6 MiB
CPU user 169.5s
CPU sys 2.9s
Peak spill 0 B

smj — branch

Metric Value
Wall time 25.0s
Peak memory 646.5 MiB
Avg memory 291.6 MiB
CPU user 171.8s
CPU sys 2.9s
Peak spill 0 B

File an issue against this benchmark runner

@alamb alamb left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pantShrey -- I started going through this PR but it is big enough that understanding the implications is non trivial

I left several comments about the API design (speficially abotu Bytes vs std::io::Read and std::io::Write). Let me know what you think.

I wonder if some of the other commiters who are more familar with this code such as @2010YOUY01 and @rluvaton would have some time to review and offer their opinions as well.


#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub enum DiskManagerMode {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this is not a super useful thing to do at the moment


// Set up context with tight memory limit to force spilling
// Sorting needs some non-spillable memory, so 64 bytes should force spilling while still allowing the query to complete
// Sorting needs some non-spillable memory, so 608 bytes should force spilling while still allowing the query to complete

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this test need to change? It seems to me like we shouldn't have to change existing tests for a new pluggable backend -- shouldn't the new code only be exercised if you have explicitly added a new spilling backend?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I touched on this earlier in the PR (see the thread with @adriangb above), but happy to revisit! The test is extremely sensitive to memory thresholds ,the SpillWriteAdapter and the async read infrastructure together appear to push the baseline up enough that the original 64B limit becomes too tight for the operator to initialize, but increasing the limit caused RepartitionExec and RepartitionMerge to compete for the pool in a way that didn't trigger reliable spilling. I'll be honest that I'm not fully confident in exactly how much each piece contributes to the memory shift, which is part of why I preserved the semantic intent of the test (a spill occurs and output is correctly sorted) rather than trying to nail down a precise new threshold. I'd really appreciate your guidance on the best approach here, happy to go whichever direction you think is cleanest.

}
}

/// A simple, unmonitored file writer to support the deprecated `spill_record_batch_by_size` API.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "unmonitored" mean?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just meant that it bypassed the DiskManager metrics tracking (it didn't update used_disk_space). However, since you suggested removing this deprecated code entirely below, I will delete it entirely

///
/// Stream format is used for spill because it supports dictionary replacement, and the random
/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement).
/// An adapter that implements `std::io::Write` to bridge Arrow's synchronous

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a lot like a BufferWrtier -- and it seems like this require a separate copy.

Wouldn't a better API just be to write directly to the inner spill writer?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After further review it seems the adapter is needed because the arrow IPC writer is writing directly to std::io::Write which could be a File or something else that didn't require a second copy.

I actually think keeping std::io::write is the most flexible here (as the backend itself can decide to buffer into chunks and async write, etc). I will comment on the main API

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, happy to make this change! The adapter was a bit unfortunate, it ended up there because in the issue discussion we'd moved away from using std::io::Write on the main trait, which led me toward Bytes to give backends owned data. But as you point out, that just pushed the intermediate copy one level up into the adapter itself. Switching to &[u8] directly lets us delete the adapter entirely, which is a much cleaner outcome.

let metadata = self.tempfile.as_file().metadata()?;
let new_disk_usage = metadata.len();

// Get the old disk usage

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened to this code?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the tracking to inside FileSpillWriter::write(), updating the counter incrementally per write, so the callers don't need calling update_disk_usage() after each write

schema: SchemaRef,
state: SpillReaderStreamState,
decoder: StreamDecoder,
byte_stream: Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send>>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems ok, though I do worry a little that decoding now requires an extra copy (read from File -> Bytes, and then read from Bytes into the internal buffers of the IPC decoder)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that StreamReader reads directly into its parse buffer without an intermediate Bytes object, so it genuinely avoids that step. The tradeoff is that driving it requires spawn_blocking, which reintroduces the thread pool exhaustion problem the old state machine existed to solve. The StreamDecoder path accepts that slight extra buffering in exchange for staying fully async.

note = "This method is deprecated. Use `SpillManager::spill_record_batch_by_size` instead."
)]
#[expect(clippy::needless_pass_by_value)]
pub fn spill_record_batch_by_size(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this method has been deprecated since DataFusion 46 I think we can remove it (and make this PR simpler).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!! I will remove the function and the related struct.

/// Writer for spill file backends.
/// Receives zero-copy `Bytes` payloads from the IPCStreamWriter adapter.
pub trait SpillWriter: Send {
fn write(&mut self, data: Bytes) -> Result<()>;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all true -- however, I think that since the underlying IPC writer takes a std::io::Write, forcing all backends to use Bytes will likely require an extra unecessary copy (see comments below on SpillWriterAdapter) anways.

If you use a std::io::write like interface here, backends that want to queue chunks can do so (by copying into Bytes buffers themselves)

Thus what i suggest is:

  1. Change this to look more like std::io::wrote:
    fn write(&mut self, data: &[u8]) -> Result<()>;

Which will allow you to get rid of the write adapter

fn size(&self) -> Option<u64>;

/// Returns file contents as an async stream of byte chunks.
fn read_stream(&self) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes>> + Send>>>;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly to the writer API comment below, it seems like this will require all backends to copy data due to the fact that the Arrow StreamReader reads from a std::io::Read

https://docs.rs/arrow-ipc/59.0.0/arrow_ipc/reader/struct.StreamReader.html

Ideally the StreamReader and Writer could offer an async variant, but until that happens, it seems like it would be better to have this API return a read itself

SOmething like

    fn read_stream(&self) -> Result<Box<std::io::REad>> + Send>>>;

Alternate idea

The other idea we could explore would be to make SpillFile in terms of RecordBatches (so have it be responsible itself for storing and retriveing streams of RecordBatches (rather than streams of Bytes) to give the backend more flexibility in how it wanted to do IO

The more I talk about that the better I like this idea

@pantShrey pantShrey Jun 19, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current path doesn't use StreamReader at all , it uses StreamDecoder, Arrow's push-based API where you feed Bytes chunks and it emits RecordBatches without a Read impl. Returning Box<dyn Read> would reintroduce the thread pool exhaustion and the old state machine that existed to solve it. The StreamDecoder path keeps the read fully async via tokio::fs::File + ReaderStream

@adriangbot

Copy link
Copy Markdown

🤖 Benchmark completed (GKE) | trigger

Instance: c4a-highmem-16 (12 vCPU / 65 GiB)

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected
Details

Comparing HEAD and abstract-spill-file
--------------------
Benchmark external_aggr.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃                               HEAD ┃               abstract-spill-file ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ Q1(64.0 MB)  │     56.47 / 60.31 ±3.77 / 67.38 ms │    58.56 / 61.46 ±3.36 / 67.88 ms │    no change │
│ Q1(32.0 MB)  │     51.15 / 52.79 ±1.29 / 54.17 ms │    55.60 / 57.68 ±2.10 / 61.25 ms │ 1.09x slower │
│ Q1(16.0 MB)  │     54.94 / 56.52 ±1.05 / 58.09 ms │    59.16 / 61.43 ±1.51 / 63.06 ms │ 1.09x slower │
│ Q2(512.0 MB) │ 262.47 / 290.74 ±17.74 / 316.12 ms │ 281.48 / 295.73 ±8.47 / 307.19 ms │    no change │
│ Q2(256.0 MB) │ 253.53 / 269.51 ±21.23 / 309.45 ms │ 263.20 / 264.93 ±1.21 / 266.78 ms │    no change │
│ Q2(128.0 MB) │ 253.51 / 284.35 ±40.83 / 356.51 ms │ 269.65 / 273.67 ±3.64 / 280.30 ms │    no change │
│ Q2(64.0 MB)  │ 251.51 / 275.61 ±39.20 / 353.66 ms │ 287.38 / 289.66 ±1.31 / 291.31 ms │ 1.05x slower │
│ Q2(32.0 MB)  │ 321.56 / 368.19 ±53.49 / 449.30 ms │ 380.82 / 384.33 ±2.27 / 387.78 ms │    no change │
└──────────────┴────────────────────────────────────┴───────────────────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                  ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)                  │ 1658.02ms │
│ Total Time (abstract-spill-file)   │ 1688.88ms │
│ Average Time (HEAD)                │  207.25ms │
│ Average Time (abstract-spill-file) │  211.11ms │
│ Queries Faster                     │         0 │
│ Queries Slower                     │         3 │
│ Queries with No Change             │         5 │
│ Queries with Failure               │         0 │
└────────────────────────────────────┴───────────┘

Resource Usage

external_aggr — base (merge-base)

Metric Value
Wall time 495.1s
Peak memory 548.5 MiB
Avg memory 10.9 MiB
CPU user 25.2s
CPU sys 3.5s
Peak spill 0 B

external_aggr — branch

Metric Value
Wall time 495.1s
Peak memory 442.5 MiB
Avg memory 9.5 MiB
CPU user 30.2s
CPU sys 8.7s
Peak spill 0 B

File an issue against this benchmark runner

@pantShrey

Copy link
Copy Markdown
Contributor Author

@alamb Thank you so much for going through this!

The RecordBatch-level abstraction idea is really interesting, and I'll go through the implications more comprehensively , but two things come to mind right away that are worth aligning on:

  1. Dependency direction: SpillFile lives in datafusion-execution, which doesn't currently depend on arrow-ipc. Moving IPC encoding/decoding into the OS backend would add that dependency.
  2. Schema: If read_stream() returns a SendableRecordBatchStream, the backend needs a schema to initialize the decoder either stored at creation time or passed as a SchemaRef parameter.

I do think this is the right long-term direction, and it would let us clean up SpillWriteAdapter entirely. That said, since this PR is currently blocking ParadeDB's Postgres integration, I'm a little worried that expanding the scope here might stall them further.

Happy to defer to your judgement on whether to tackle it in this PR or as a follow-up!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto detected api change Auto detected API change execution Related to the execution crate physical-plan Changes to the physical-plan crate review:waiting Ready for an initial review by a committer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow pluggable file backends in DiskManager and IPCStreamWriter to support non-OS file systems

4 participants