Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/sorts/multi_level_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,10 @@ impl MultiLevelMergeBuilder {
// as we are not holding the memory for them
let mut sorted_streams = mem::take(&mut self.sorted_streams);

let (sorted_spill_files, buffer_size) = self
let (sorted_spill_files, _) = self
.get_sorted_spill_files_to_merge(
2,
// No read-ahead buffering needed, reserve memory for 1 batch per file
1,
// we must have at least 2 streams to merge
2_usize.saturating_sub(sorted_streams.len()),
&mut memory_reservation,
Expand All @@ -273,7 +274,6 @@ impl MultiLevelMergeBuilder {
let stream = self
.spill_manager
.clone()
.with_batch_read_buffer_capacity(buffer_size)
.read_spill_as_stream(
spill.file,
Some(spill.max_record_batch_memory),
Expand Down
14 changes: 2 additions & 12 deletions datafusion/physical-plan/src/spill/spill_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;

use super::{SpillReaderStream, in_progress_spill_file::InProgressSpillFile};
use crate::coop::cooperative;
use crate::{common::spawn_buffered, metrics::SpillMetrics};
use crate::metrics::SpillMetrics;

/// The `SpillManager` is responsible for the following tasks:
/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
Expand All @@ -41,8 +41,6 @@ pub struct SpillManager {
env: Arc<RuntimeEnv>,
pub(crate) metrics: SpillMetrics,
schema: SchemaRef,
/// Number of batches to buffer in memory during disk reads
batch_read_buffer_capacity: usize,
/// general-purpose compression options
pub(crate) compression: SpillCompression,
}
Expand All @@ -53,18 +51,10 @@ impl SpillManager {
env,
metrics,
schema,
batch_read_buffer_capacity: 2,
compression: SpillCompression::default(),
}
}

pub fn with_batch_read_buffer_capacity(
mut self,
batch_read_buffer_capacity: usize,
) -> Self {
self.batch_read_buffer_capacity = batch_read_buffer_capacity;
self
}

pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self {
self.compression = spill_compression;
Expand Down Expand Up @@ -186,7 +176,7 @@ impl SpillManager {
max_record_batch_memory,
)));

Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems batch_read_buffer_capacity is no more used.
It could be deprecated or maybe even removed.
https://github.com/dekuu5/datafusion/blob/1b8ef43fdd1424a3e4fe2db213fec4e7228788b0/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L276 is a no-op

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay sure i will look into that and the tests also

Ok(stream)
}

/// Same as `read_spill_as_stream`, but without buffering.
Expand Down
40 changes: 40 additions & 0 deletions datafusion/physical-plan/src/spill/spill_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1441,4 +1441,44 @@ mod tests {

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_concurrent_writer_reader_race_condition() -> Result<()> {
// stress testing the concurncy in the reader and the reader to make sure there is now race condtion
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// stress testing the concurncy in the reader and the reader to make sure there is now race condtion
// stress testing the concurrency in the reader and the writer to make sure there is now race condition

// going for 100 iterations with a 5 batches per iteration
const NUM_BATCHES: usize = 5;
const ITERATIONS: usize = 100;

for iteration in 0..ITERATIONS {
let (writer, mut reader) = create_spill_channel(1024 * 1024);

let writer_handle = SpawnedTask::spawn(async move {
for i in 0..NUM_BATCHES {
let batch = create_test_batch(i as i32 * 10, 10);
writer.push_batch(&batch).unwrap();
tokio::task::yield_now().await;
}
});

let reader_handle = SpawnedTask::spawn(async move {
let mut batches_read = 0;
while let Some(result) = reader.next().await {
let _batch = result.unwrap();
batches_read += 1;
tokio::task::yield_now().await;
}
batches_read
});

writer_handle.join().await.unwrap();
let batches_read = reader_handle.join().await.unwrap();

assert_eq!(
batches_read, NUM_BATCHES,
"Iteration {iteration}: Expected {NUM_BATCHES} got {batches_read}."
);
}

Ok(())
}
}
Loading