feat: introduce pluggable SpillFile trait and TempFileFactory for custom spill backends#21882
feat: introduce pluggable SpillFile trait and TempFileFactory for custom spill backends#21882pantShrey wants to merge 2 commits into
Conversation
|
@alamb I opened this draft PR to get early feedback on the architecture.
I might be missing something here, so would really appreciate your guidance. |
|
Thanks -- will try and look at this shortly |
Kind of, though it seems like accumulating technical debt as we'll have APIs that will not be needed once we complete the work for SortMergeJoin What do you think about making a first PR to migrate SortMergeJoin to use the spill abstraction?
Makes sense to me |
alamb
left a comment
There was a problem hiding this comment.
Thanks @pantShrey - I reviewed this and the basic idea looks good to me. I do think it would be nice to have a unified (async) IO abstraction rather than leaving some hook around for sync IO and making this API more complicated
| used_disk_space: Arc<AtomicU64>, | ||
| /// Number of active temporary files created by this disk manager | ||
| active_files_count: Arc<AtomicUsize>, | ||
| /// Custom Backend |
There was a problem hiding this comment.
A small nit: I think "custom" is a somewhat unecessary term here . Perhaps this
factory: Option<Arc<dyn TempFileFactory>>,or
temp_file_factory: Option<Arc<dyn TempFileFactory>>,would be more consistent with the rest of the codebase
|
|
||
| #[derive(Clone, Debug, Default)] | ||
| #[derive(Clone, Default)] | ||
| pub enum DiskManagerMode { |
There was a problem hiding this comment.
If we are going to permit a TempFileFactory here, I wonder if it would make sense (maybe as a follow on PR) here to move everything over
So like
pub enum DiskManagerMode {
Custom(Arc<dyn TempFileFactory>),
}And then implement a basic DirectoriesTempFileFactor and a NoTempFilesFactory that are provided along with datafusion
That might simplify the code, and it would also ensure the TempFileFactory API is rich enough for the existing temp file strategies
There was a problem hiding this comment.
maybe this is not a super useful thing to do at the moment
| .collect() | ||
| } | ||
|
|
||
| pub struct OsSpillWriter { |
| /// Writer for spill file backends. | ||
| /// Receives zero-copy `Bytes` payloads from the IPCStreamWriter adapter. | ||
| pub trait SpillWriter: Send { | ||
| fn write(&mut self, data: Bytes) -> Result<()>; |
There was a problem hiding this comment.
This is pretty similar to https://doc.rust-lang.org/std/io/trait.Write.html 🤔
There was a problem hiding this comment.
Yes, you are right. The reason I didn't use Write trait which uses &[u8] was for ownership reasons. Some backends might queue chunks to a background task (e.g., S3 multipart via a channel) and need to hold the data past the write() call's return. &[u8] can't express that, and it would force a second copy between the SpillWriteAdapter and the SpillWriter.
Also, the custom SpillWriter trait contains finish(), which maps perfectly to complete_multipart_upload for S3 and resource owner cleanup for Postgres.
There was a problem hiding this comment.
This is all true -- however, I think that since the underlying IPC writer takes a std::io::Write, forcing all backends to use Bytes will likely require an extra unecessary copy (see comments below on SpillWriterAdapter) anways.
If you use a std::io::write like interface here, backends that want to queue chunks can do so (by copying into Bytes buffers themselves)
Thus what i suggest is:
- Change this to look more like std::io::wrote:
fn write(&mut self, data: &[u8]) -> Result<()>;Which will allow you to get rid of the write adapter
|
@alamb Thank you so much for the review! I scoped out the SortMergeJoin migration today, specifically looking at bitwise_stream.rs and process_key_match_with_filter, to see what it would take. Because SortMergeJoin currently reads from the spill file via a synchronous for loop inside a hand-rolled poll state machine, making the read path truly async requires a major rewrite. We can't just .await the stream, so we may need to store the SendableRecordBatchStream in the execution state and manually persist variables like matched_count across Poll::Pending yields. Because ParadeDB is hoping to unblock their Postgres integration next week, I'm worried a state machine rewrite of this scale will stall them. Would you be open to merging this core abstraction first (with open_sync_reader marked as #[deprecated])? I can open a dedicated tracking issue for the SortMergeJoin async migration and tackle it as a fast follow-up PR. I am happy to defer to your judgment if you feel the tech debt must be addressed first! |
How about we try it in parallel? |
@alamb sure, i have already started to work on that locally while waiting for the response also i am actually still stuck on the The issue stems from the fact that RepartitionMerge now requires more memory than a RepartitionExec node, this greedily allocates memory to RepartitionExec which could have spilled instead of RepartitionMerge which cannot spill. I would really appreciate any guidance on this, am I missing something obvious here? |
Sadly I am not familar with this test so I don't have a lot to offer you Maybe you can look at git history and see who introduced the test and maybe they might have some ideas |
|
Hey @adriangb, Andrew suggested I reach out to you since you originally authored The test is currently stuck in a memory-accounting deadlock. Here’s what is happening:
I was able to trigger a spill once by setting the test memory limit to 608 B, but even that was not sufficient for the test to pass reliably. Is there a correct or idiomatic way to configure this test (batch sizes, data volume, memory pool limits, etc.) to reliably force a I would really appreciate any guidance you could provide. |
|
IIRC that test was added when we added spilling to RepartitionExec. Conceptually the test is simple: if RepartitionExec is configured to preserve order and it spills we need to make sure that spilling did not shuffle the data. The orchestration however is difficult: forcing a RepartitionExec to spill usually requires skewed upstream partition consumption rates. You could try to change the test to eg use a GroupBy or maybe we can use a RepartitionExec in isolation if we pull from the streams in the right way. I think the structure can be changed quite a bit as long as we preserve the semantic meaning of the test, I am not surprised that it is pretty fragile to changes. |
2971e41 to
de6697f
Compare
|
@alamb I’ve addressed the nits and force-pushed the updates. Could you please trigger the CI and take another look when you have a moment? In the meantime, I am working on migrating |
|
@adriangb Thank you so much for the guidance! I updated the test to simply assert that a spill does occur |
That makes sense to me. |
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
|
|
e31bff4 to
086632a
Compare
|
Hey @alamb, quick update! While working on the I believe the PR is now ready for review, so I've marked it as such. I'd appreciate another look whenever you have the time. Thank you! |
Grerat-- can you please make a PR for just the SMJ refactor and then stack this PR on it? |
|
That will make it easier / faster to review (I am not a SMJ expert so I can't really review that part effiicently) |
915532b to
6954b55
Compare
|
Hey @alamb, quick update! I've reworked both PRs to make them easier to review independently:
The plan is for #22230 to merge first, then I'll rebase this on top of it. Would be grateful if you could take a look whenever you get the chance! |
…e#22230) ~~## Note: This PR depends on apache#21882 (pluggable SpillFile trait) and cannot be merged before it. Opening in parallel per @alamb's suggestion for easier review. The required SpillFile trait used here is defined in that base PR.To review locally, apply apache#21882 first and then stack this branch on top.~~ **Update:** This PR has been rebased to use the existing `RefCountedTempFile` and is now completely standalone. It can be reviewed and merged independently ## Which issue does this PR close? - Contributes to apache#21215 (and is required by apache#21882) ## Rationale for this change `materializing_stream.rs` and `bitwise_stream.rs` were reading spilled batches via `open_sync_reader` / direct `File::open` calls ~~, bypassing the `SpillFile` abstraction introduced in apache#21882~~. This PR migrates both to use `SpillManager::read_spill_as_stream`. This safely converts the SMJ to an async I/O path, preparing the ground for custom backends (Postgres BufFile, object storage) to handle spill reads without requiring an OS file path. ## What changes are included in this PR? - `materializing_stream.rs`: Eagerly restores spilled `BufferedBatches` via async streams before freezing, avoiding new state machine variants. - `bitwise_stream.rs`: Replaces sync reads with an async `poll_next_unpin` loop, caching the stream to survive `Poll::Pending`. ~~- `spill_file.rs`: Removes `open_sync_reader` from the `SpillFile` trait (no longer needed).~~ ## Are these changes tested? Covered by existing SMJ tests. No new tests added, the behavioral change is internal (sync → async IO path). ## Are there any user-facing changes? No. ~~Removes `open_sync_reader` from the SpillFile trait, this is a breaking API change for anyone implementing the trait, but the trait was introduced in apache#21882 which has not merged yet so there are no external implementors.~~ --------- Co-authored-by: Kumar Ujjawal <ujjawalpathak6@gmail.com>
6954b55 to
d71a415
Compare
|
run benchmark spill_io |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing abstract-spill-file (d71a415) to 6176a6d (merge-base) diff using: spill_io File an issue against this benchmark runner |
|
run benchmark external_aggr |
|
run benchmark smj |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing abstract-spill-file (d71a415) to 6176a6d (merge-base) diff using: smj File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing abstract-spill-file (d71a415) to 6176a6d (merge-base) diff using: external_aggr File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagespill_io — base (merge-base)
spill_io — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesmj — base (merge-base)
smj — branch
File an issue against this benchmark runner |
alamb
left a comment
There was a problem hiding this comment.
Thanks @pantShrey -- I started going through this PR but it is big enough that understanding the implications is non trivial
I left several comments about the API design (speficially abotu Bytes vs std::io::Read and std::io::Write). Let me know what you think.
I wonder if some of the other commiters who are more familar with this code such as @2010YOUY01 and @rluvaton would have some time to review and offer their opinions as well.
|
|
||
| #[derive(Clone, Debug, Default)] | ||
| #[derive(Clone, Default)] | ||
| pub enum DiskManagerMode { |
There was a problem hiding this comment.
maybe this is not a super useful thing to do at the moment
|
|
||
| // Set up context with tight memory limit to force spilling | ||
| // Sorting needs some non-spillable memory, so 64 bytes should force spilling while still allowing the query to complete | ||
| // Sorting needs some non-spillable memory, so 608 bytes should force spilling while still allowing the query to complete |
There was a problem hiding this comment.
Why did this test need to change? It seems to me like we shouldn't have to change existing tests for a new pluggable backend -- shouldn't the new code only be exercised if you have explicitly added a new spilling backend?
There was a problem hiding this comment.
I touched on this earlier in the PR (see the thread with @adriangb above), but happy to revisit! The test is extremely sensitive to memory thresholds ,the SpillWriteAdapter and the async read infrastructure together appear to push the baseline up enough that the original 64B limit becomes too tight for the operator to initialize, but increasing the limit caused RepartitionExec and RepartitionMerge to compete for the pool in a way that didn't trigger reliable spilling. I'll be honest that I'm not fully confident in exactly how much each piece contributes to the memory shift, which is part of why I preserved the semantic intent of the test (a spill occurs and output is correctly sorted) rather than trying to nail down a precise new threshold. I'd really appreciate your guidance on the best approach here, happy to go whichever direction you think is cleanest.
| } | ||
| } | ||
|
|
||
| /// A simple, unmonitored file writer to support the deprecated `spill_record_batch_by_size` API. |
There was a problem hiding this comment.
what does "unmonitored" mean?
There was a problem hiding this comment.
I just meant that it bypassed the DiskManager metrics tracking (it didn't update used_disk_space). However, since you suggested removing this deprecated code entirely below, I will delete it entirely
| /// | ||
| /// Stream format is used for spill because it supports dictionary replacement, and the random | ||
| /// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement). | ||
| /// An adapter that implements `std::io::Write` to bridge Arrow's synchronous |
There was a problem hiding this comment.
This looks a lot like a BufferWrtier -- and it seems like this require a separate copy.
Wouldn't a better API just be to write directly to the inner spill writer?
There was a problem hiding this comment.
After further review it seems the adapter is needed because the arrow IPC writer is writing directly to std::io::Write which could be a File or something else that didn't require a second copy.
I actually think keeping std::io::write is the most flexible here (as the backend itself can decide to buffer into chunks and async write, etc). I will comment on the main API
There was a problem hiding this comment.
Agreed, happy to make this change! The adapter was a bit unfortunate, it ended up there because in the issue discussion we'd moved away from using std::io::Write on the main trait, which led me toward Bytes to give backends owned data. But as you point out, that just pushed the intermediate copy one level up into the adapter itself. Switching to &[u8] directly lets us delete the adapter entirely, which is a much cleaner outcome.
| let metadata = self.tempfile.as_file().metadata()?; | ||
| let new_disk_usage = metadata.len(); | ||
|
|
||
| // Get the old disk usage |
There was a problem hiding this comment.
I moved the tracking to inside FileSpillWriter::write(), updating the counter incrementally per write, so the callers don't need calling update_disk_usage() after each write
| schema: SchemaRef, | ||
| state: SpillReaderStreamState, | ||
| decoder: StreamDecoder, | ||
| byte_stream: Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send>>, |
There was a problem hiding this comment.
this seems ok, though I do worry a little that decoding now requires an extra copy (read from File -> Bytes, and then read from Bytes into the internal buffers of the IPC decoder)
There was a problem hiding this comment.
You're right that StreamReader reads directly into its parse buffer without an intermediate Bytes object, so it genuinely avoids that step. The tradeoff is that driving it requires spawn_blocking, which reintroduces the thread pool exhaustion problem the old state machine existed to solve. The StreamDecoder path accepts that slight extra buffering in exchange for staying fully async.
| note = "This method is deprecated. Use `SpillManager::spill_record_batch_by_size` instead." | ||
| )] | ||
| #[expect(clippy::needless_pass_by_value)] | ||
| pub fn spill_record_batch_by_size( |
There was a problem hiding this comment.
Given this method has been deprecated since DataFusion 46 I think we can remove it (and make this PR simpler).
There was a problem hiding this comment.
Update here is the PR:
There was a problem hiding this comment.
Thank you!! I will remove the function and the related struct.
| /// Writer for spill file backends. | ||
| /// Receives zero-copy `Bytes` payloads from the IPCStreamWriter adapter. | ||
| pub trait SpillWriter: Send { | ||
| fn write(&mut self, data: Bytes) -> Result<()>; |
There was a problem hiding this comment.
This is all true -- however, I think that since the underlying IPC writer takes a std::io::Write, forcing all backends to use Bytes will likely require an extra unecessary copy (see comments below on SpillWriterAdapter) anways.
If you use a std::io::write like interface here, backends that want to queue chunks can do so (by copying into Bytes buffers themselves)
Thus what i suggest is:
- Change this to look more like std::io::wrote:
fn write(&mut self, data: &[u8]) -> Result<()>;Which will allow you to get rid of the write adapter
| fn size(&self) -> Option<u64>; | ||
|
|
||
| /// Returns file contents as an async stream of byte chunks. | ||
| fn read_stream(&self) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes>> + Send>>>; |
There was a problem hiding this comment.
similarly to the writer API comment below, it seems like this will require all backends to copy data due to the fact that the Arrow StreamReader reads from a std::io::Read
https://docs.rs/arrow-ipc/59.0.0/arrow_ipc/reader/struct.StreamReader.html
Ideally the StreamReader and Writer could offer an async variant, but until that happens, it seems like it would be better to have this API return a read itself
SOmething like
fn read_stream(&self) -> Result<Box<std::io::REad>> + Send>>>;Alternate idea
The other idea we could explore would be to make SpillFile in terms of RecordBatches (so have it be responsible itself for storing and retriveing streams of RecordBatches (rather than streams of Bytes) to give the backend more flexibility in how it wanted to do IO
The more I talk about that the better I like this idea
There was a problem hiding this comment.
The current path doesn't use StreamReader at all , it uses StreamDecoder, Arrow's push-based API where you feed Bytes chunks and it emits RecordBatches without a Read impl. Returning Box<dyn Read> would reintroduce the thread pool exhaustion and the old state machine that existed to solve it. The StreamDecoder path keeps the read fully async via tokio::fs::File + ReaderStream
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageexternal_aggr — base (merge-base)
external_aggr — branch
File an issue against this benchmark runner |
|
@alamb Thank you so much for going through this! The
I do think this is the right long-term direction, and it would let us clean up Happy to defer to your judgement on whether to tackle it in this PR or as a follow-up! |
Which issue does this PR close?
Rationale for this change
DataFusion’s spill infrastructure is tightly coupled to OS-level files, with no extension points for alternative storage backends.
DiskManagercannot be customized for file creation, andIPCStreamWriterdepends on OS file paths.This prevents integration in environments where temporary storage must be managed by the host system. For example, Postgres extensions (e.g., ParadeDB) require spill files to go through
BufFileAPIs to respecttemp_tablespaces, enforcetemp_file_limit, and integrate with transaction-scoped cleanup. SinceBufFilehas no OS-visible path, it cannot work with the current design.A secondary motivation raised by @alamb is supporting object storage backends (S3, GCS) for spilling, which require async IO and cannot use
std::io::Writeorstd::io::Read.What changes are included in this PR?
SpillFile,SpillWriter, andTempFileFactorytraits to abstract spill file handlingDiskManagerMode::Customto allow pluggable backendsDiskManagerto returnArc<dyn SpillFile>instead of OS-bound typesSpillWriteAdapterto bridge sync Arrow writers with backend-agnostic writersStream<Item = Result<Bytes>>) instead of blocking state machinesArc<dyn SpillFile>Are these changes tested?
Yes. Existing spill tests cover the full read/write flow.
test_disk_usage_decreases_as_files_consumedby correcting a pre-existing off-by-one assumption in file rotationtest_preserve_order_with_spillingby just asserting spilling occurs (spill_count>0) and output batches are sortedAre there any user-facing changes?
Yes this introduces API changes:
Arc<dyn SpillFile>instead ofRefCountedTempFileSpillFile,SpillWriter,TempFileFactoryDiskManagerMode::Customfor custom backendsCustom spill backends can now be implemented and plugged in via
DiskManager.