Skip to content

Commit 00c533b

Browse files
committed
fix: finalize S3 uploads for CSV/TBL writes
1 parent 012154f commit 00c533b

4 files changed

Lines changed: 53 additions & 10 deletions

File tree

spatialbench-cli/src/generate.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,13 @@ pub trait Source: Send {
4545
/// Something that can write the contents of a buffer somewhere
4646
///
4747
/// For example, this is implemented for a file writer.
48-
pub trait Sink: Send {
48+
pub trait Sink: Send + Sized {
4949
/// Write all data from the buffer to the sink
5050
fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error>;
5151

52-
/// Complete and flush any remaining data from the sink
53-
fn flush(self) -> Result<(), io::Error>;
52+
/// Complete and flush any remaining data from the sink, returning it
53+
/// so the caller can perform additional finalization (e.g. async S3 upload).
54+
fn flush(self) -> Result<Self, io::Error>;
5455
}
5556

5657
/// Generates data in parallel from a series of [`Source`] and writes to a [`Sink`]
@@ -69,7 +70,7 @@ pub async fn generate_in_chunks<G, I, S>(
6970
mut sink: S,
7071
sources: I,
7172
num_threads: usize,
72-
) -> Result<(), io::Error>
73+
) -> Result<S, io::Error>
7374
where
7475
G: Source + 'static,
7576
I: Iterator<Item = G>,
@@ -86,7 +87,7 @@ where
8687

8788
// write the header
8889
let Some(first) = sources.peek() else {
89-
return Ok(()); // no sources
90+
return Ok(sink); // no sources
9091
};
9192
let header = first.header(Vec::new());
9293
tx.send(header)
@@ -131,7 +132,8 @@ where
131132
sink.sink(&buffer)?;
132133
captured_recycler.return_buffer(buffer);
133134
}
134-
// No more input, flush the sink and return
135+
// No more input, flush the sink and return it so the caller can
136+
// perform additional finalization (e.g. async S3 upload).
135137
sink.flush()
136138
});
137139

spatialbench-cli/src/main.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,11 @@ impl<W: Write> WriterSink<W> {
421421
statistics: WriteStatistics::new("buffers"),
422422
}
423423
}
424+
425+
/// Consume the sink and return the inner writer for further finalization.
426+
fn into_inner(self) -> W {
427+
self.inner
428+
}
424429
}
425430

426431
impl<W: Write + Send> Sink for WriterSink<W> {
@@ -430,7 +435,8 @@ impl<W: Write + Send> Sink for WriterSink<W> {
430435
self.inner.write_all(buffer)
431436
}
432437

433-
fn flush(mut self) -> Result<(), io::Error> {
434-
self.inner.flush()
438+
fn flush(mut self) -> Result<Self, io::Error> {
439+
self.inner.flush()?;
440+
Ok(self)
435441
}
436442
}

spatialbench-cli/src/runner.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ where
197197
match plan.output_location() {
198198
OutputLocation::Stdout => {
199199
let sink = WriterSink::new(io::stdout());
200-
generate_in_chunks(sink, sources, num_threads).await
200+
generate_in_chunks(sink, sources, num_threads).await?;
201+
Ok(())
201202
}
202203
OutputLocation::File(path) => {
203204
// if the output already exists, skip running
@@ -224,7 +225,9 @@ where
224225
info!("Writing to S3: {}", uri);
225226
let s3_writer = S3Writer::with_client(Arc::clone(client), path);
226227
let sink = WriterSink::new(s3_writer);
227-
generate_in_chunks(sink, sources, num_threads).await
228+
let sink = generate_in_chunks(sink, sources, num_threads).await?;
229+
sink.into_inner().finish().await?;
230+
Ok(())
228231
}
229232
}
230233
}

spatialbench-cli/src/s3_writer.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,4 +533,36 @@ mod tests {
533533
writer.write_all(&[4, 5]).unwrap();
534534
assert_eq!(writer.total_bytes(), 5);
535535
}
536+
537+
/// Verify that `std::io::Write::flush()` does NOT upload data to S3.
538+
/// Data is only uploaded when `finish()` is called. This test guards
539+
/// against the bug where CSV/TBL writes were silently lost because
540+
/// the `WriterSink` called `flush()` (a no-op) but never `finish()`.
541+
#[tokio::test]
542+
async fn flush_does_not_upload_without_finish() {
543+
let store = Arc::new(InMemory::new());
544+
let mut writer = S3Writer::with_client(store.clone(), "output/flush_test.csv");
545+
546+
let data = b"col1,col2\nfoo,bar\n";
547+
writer.write_all(data).unwrap();
548+
writer.flush().unwrap();
549+
550+
// Data should NOT be in the store yet — flush is a no-op
551+
let result = store.get(&ObjectPath::from("output/flush_test.csv")).await;
552+
assert!(
553+
result.is_err(),
554+
"data should not be uploaded before finish()"
555+
);
556+
557+
// Now call finish — data should appear
558+
let total = writer.finish().await.unwrap();
559+
assert_eq!(total, data.len());
560+
561+
let result = store
562+
.get(&ObjectPath::from("output/flush_test.csv"))
563+
.await
564+
.unwrap();
565+
let stored = result.bytes().await.unwrap();
566+
assert_eq!(stored.as_ref(), data);
567+
}
536568
}

0 commit comments

Comments
 (0)