Skip to content

Commit 8bfd82e

Browse files
scottopellclaude
andcommitted
feat(capture): add file rotation support for Parquet format
Add rotation API to CaptureManager that allows rotating to new output files without stopping the capture. This enables long-running capture sessions to produce multiple readable Parquet files with valid footers. Changes: - Add RotationRequest/RotationSender types for async rotation requests - Add start_with_rotation() that spawns event loop and returns (sender, JoinHandle) - Add replace_format() to StateMachine for IO-agnostic format swapping - Add rotate() trait method stub to OutputFormat (returns error by default) - Add rotate_to() inherent method on parquet Format<BufWriter<File>> - Return JoinHandle for graceful shutdown and data flush guarantees The rotation flow: 1. Caller sends RotationRequest with new file path via RotationSender 2. CaptureManager creates new file and format 3. StateMachine.replace_format() flushes and swaps formats 4. Old format is closed (writing Parquet footer) 5. Response sent back to caller The returned JoinHandle allows callers to await the capture task to ensure all buffered metrics are flushed before process exit. Combines commits d0aebbe and 0542dd6 from sopell/expose-observer-public. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 618001c commit 8bfd82e

4 files changed

Lines changed: 274 additions & 1 deletion

File tree

lading_capture/src/formats.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ pub enum Error {
2424
/// IO errors during write operations
2525
#[error("IO error: {0}")]
2626
Io(#[from] std::io::Error),
27+
/// Rotation not supported for this format
28+
#[error("File rotation not supported for this format")]
29+
RotationNotSupported,
2730
}
2831

2932
/// Trait for output format implementations
@@ -63,6 +66,29 @@ pub trait OutputFormat {
6366
///
6467
/// Returns an error if closing fails.
6568
fn close(self) -> Result<(), Error>;
69+
70+
/// Rotate to a new output file
71+
///
72+
/// Flushes and closes the current file (writing footer for Parquet), then
73+
/// opens a new file at the specified path. This allows continuous metrics
74+
/// collection while producing multiple readable output files.
75+
///
76+
/// The default implementation returns `RotationNotSupported`. Formats that
77+
/// support rotation (like Parquet with file-based writers) should override.
78+
///
79+
/// # Arguments
80+
///
81+
/// * `path` - Path for the new output file
82+
///
83+
/// # Errors
84+
///
85+
/// Returns an error if rotation is not supported or if file operations fail.
86+
fn rotate(self, _path: std::path::PathBuf) -> Result<Self, Error>
87+
where
88+
Self: Sized,
89+
{
90+
Err(Error::RotationNotSupported)
91+
}
6692
}
6793

6894
#[cfg(test)]

lading_capture/src/formats/parquet.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ pub struct Format<W: Write + Seek + Send> {
153153
writer: ArrowWriter<W>,
154154
/// Pre-computed Arrow schema
155155
schema: Arc<Schema>,
156+
/// Compression level for Zstd (stored for rotation)
157+
compression_level: i32,
156158
}
157159

158160
impl<W: Write + Seek + Send> Format<W> {
@@ -192,6 +194,7 @@ impl<W: Write + Seek + Send> Format<W> {
192194
buffers: ColumnBuffers::new(),
193195
writer: arrow_writer,
194196
schema,
197+
compression_level,
195198
})
196199
}
197200

@@ -338,6 +341,36 @@ impl<W: Write + Seek + Send> crate::formats::OutputFormat for Format<W> {
338341
}
339342
}
340343

344+
impl Format<std::io::BufWriter<std::fs::File>> {
345+
/// Rotate to a new output file
346+
///
347+
/// Closes the current Parquet file (writing footer) and opens a new file
348+
/// at the specified path with the same compression settings.
349+
///
350+
/// # Errors
351+
///
352+
/// Returns an error if closing the current file or creating the new file fails.
353+
pub fn rotate_to(self, path: std::path::PathBuf) -> Result<Self, Error> {
354+
// Store compression level before closing
355+
let compression_level = self.compression_level;
356+
357+
// Close current file (writes footer)
358+
self.close()?;
359+
360+
// Create new file and writer
361+
let file = std::fs::File::create(&path)?;
362+
let writer = std::io::BufWriter::new(file);
363+
let format = Self::new(writer, compression_level)?;
364+
365+
Ok(format)
366+
}
367+
368+
/// Get the compression level for this format
369+
pub fn compression_level(&self) -> i32 {
370+
self.compression_level
371+
}
372+
}
373+
341374
#[cfg(test)]
342375
mod tests {
343376
use super::*;

lading_capture/src/manager.rs

Lines changed: 182 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::{
1717
};
1818

1919
use arc_swap::ArcSwap;
20-
use tokio::{fs, sync::mpsc, time};
20+
use tokio::{fs, sync::mpsc, sync::oneshot, time};
2121

2222
use crate::{
2323
accumulator,
@@ -445,6 +445,27 @@ impl CaptureManager<formats::jsonl::Format<BufWriter<std::fs::File>>, RealClock>
445445
}
446446
}
447447

448+
/// Request to rotate to a new output file
449+
///
450+
/// Contains the path for the new file and a channel to send the result.
451+
pub struct RotationRequest {
452+
/// Path for the new output file
453+
pub path: PathBuf,
454+
/// Channel to send rotation result (Ok on success, Err on failure)
455+
pub response: oneshot::Sender<Result<(), formats::Error>>,
456+
}
457+
458+
impl std::fmt::Debug for RotationRequest {
459+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
460+
f.debug_struct("RotationRequest")
461+
.field("path", &self.path)
462+
.finish_non_exhaustive()
463+
}
464+
}
465+
466+
/// Handle for sending rotation requests to a running CaptureManager
467+
pub type RotationSender = mpsc::Sender<RotationRequest>;
468+
448469
impl CaptureManager<formats::parquet::Format<BufWriter<std::fs::File>>, RealClock> {
449470
/// Create a new [`CaptureManager`] with file-based Parquet writer
450471
///
@@ -478,6 +499,166 @@ impl CaptureManager<formats::parquet::Format<BufWriter<std::fs::File>>, RealCloc
478499
RealClock::default(),
479500
))
480501
}
502+
503+
/// Run [`CaptureManager`] with file rotation support
504+
///
505+
/// Similar to [`start`](CaptureManager::start), but also provides a channel
506+
/// for rotation requests. When a rotation request is received, the current
507+
/// Parquet file is finalized (footer written) and a new file is created at
508+
/// the specified path.
509+
///
510+
/// Returns a tuple of ([`RotationSender`], [`JoinHandle`](tokio::task::JoinHandle))
511+
/// immediately. The `RotationSender` can be used to trigger rotations while
512+
/// the event loop runs. The `JoinHandle` can be awaited to ensure the
513+
/// CaptureManager has fully drained and closed before shutdown.
514+
///
515+
/// # Errors
516+
///
517+
/// Returns an error if there is already a global recorder set.
518+
#[allow(clippy::cast_possible_truncation)]
519+
pub async fn start_with_rotation(
520+
mut self,
521+
) -> Result<(RotationSender, tokio::task::JoinHandle<()>), Error> {
522+
// Create rotation channel - return the sender immediately
523+
let (rotation_tx, rotation_rx) = mpsc::channel::<RotationRequest>(4);
524+
525+
// Initialize historical sender
526+
HISTORICAL_SENDER.store(Arc::new(Some(Arc::new(Sender {
527+
snd: self.snd.clone(),
528+
}))));
529+
530+
self.install()?;
531+
info!("Capture manager installed with rotation support, recording to capture file.");
532+
533+
// Wait until the target is running then mark time-zero
534+
self.target_running.recv().await;
535+
self.clock.mark_start();
536+
537+
let compression_level = self.format.compression_level();
538+
539+
// Run the event loop in a spawned task so we can return the sender immediately
540+
let expiration = self.expiration;
541+
let format = self.format;
542+
let flush_seconds = self.flush_seconds;
543+
let registry = self.registry;
544+
let accumulator = self.accumulator;
545+
let global_labels = self.global_labels;
546+
let clock = self.clock;
547+
let recv = self.recv;
548+
let shutdown = self.shutdown.take().expect("shutdown watcher must be present");
549+
550+
let handle = tokio::spawn(async move {
551+
if let Err(e) = Self::rotation_event_loop(
552+
expiration,
553+
format,
554+
flush_seconds,
555+
registry,
556+
accumulator,
557+
global_labels,
558+
clock,
559+
recv,
560+
shutdown,
561+
rotation_rx,
562+
compression_level,
563+
)
564+
.await
565+
{
566+
error!(error = %e, "CaptureManager rotation event loop error");
567+
}
568+
});
569+
570+
Ok((rotation_tx, handle))
571+
}
572+
573+
/// Internal event loop with rotation support
574+
#[allow(clippy::too_many_arguments)]
575+
async fn rotation_event_loop(
576+
expiration: Duration,
577+
format: formats::parquet::Format<BufWriter<std::fs::File>>,
578+
flush_seconds: u64,
579+
registry: Arc<Registry<Key, AtomicStorage>>,
580+
accumulator: Accumulator,
581+
global_labels: FxHashMap<String, String>,
582+
clock: RealClock,
583+
mut recv: mpsc::Receiver<Metric>,
584+
shutdown: lading_signal::Watcher,
585+
mut rotation_rx: mpsc::Receiver<RotationRequest>,
586+
compression_level: i32,
587+
) -> Result<(), Error> {
588+
let mut flush_interval = clock.interval(Duration::from_millis(TICK_DURATION_MS as u64));
589+
let shutdown_wait = shutdown.recv();
590+
tokio::pin!(shutdown_wait);
591+
592+
// Create state machine with owned state
593+
let mut state_machine = StateMachine::new(
594+
expiration,
595+
format,
596+
flush_seconds,
597+
registry,
598+
accumulator,
599+
global_labels,
600+
clock,
601+
);
602+
603+
// Event loop with rotation support
604+
loop {
605+
let event = tokio::select! {
606+
val = recv.recv() => {
607+
match val {
608+
Some(metric) => Event::MetricReceived(metric),
609+
None => Event::ChannelClosed,
610+
}
611+
}
612+
() = flush_interval.tick() => Event::FlushTick,
613+
Some(rotation_req) = rotation_rx.recv() => {
614+
// Handle rotation inline since it's not a state machine event
615+
let result = Self::handle_rotation(
616+
&mut state_machine,
617+
rotation_req.path,
618+
compression_level,
619+
).await;
620+
// Send result back to caller (ignore send error if receiver dropped)
621+
let _ = rotation_req.response.send(result);
622+
continue;
623+
}
624+
() = &mut shutdown_wait => Event::ShutdownSignaled,
625+
};
626+
627+
match state_machine.next(event)? {
628+
Operation::Continue => {}
629+
Operation::Exit => return Ok(()),
630+
}
631+
}
632+
}
633+
634+
/// Handle a rotation request
635+
async fn handle_rotation(
636+
state_machine: &mut StateMachine<
637+
formats::parquet::Format<BufWriter<std::fs::File>>,
638+
RealClock,
639+
>,
640+
new_path: PathBuf,
641+
compression_level: i32,
642+
) -> Result<(), formats::Error> {
643+
// Create new file and format
644+
let fp = fs::File::create(&new_path)
645+
.await
646+
.map_err(formats::Error::Io)?;
647+
let fp = fp.into_std().await;
648+
let writer = BufWriter::new(fp);
649+
let new_format = parquet::Format::new(writer, compression_level)?;
650+
651+
// Swap formats - this flushes any buffered data
652+
let old_format = state_machine
653+
.replace_format(new_format)
654+
.map_err(|e| formats::Error::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?;
655+
656+
// Close old format to write Parquet footer
657+
old_format.close()?;
658+
659+
info!(path = %new_path.display(), "Rotated to new capture file");
660+
Ok(())
661+
}
481662
}
482663

483664
impl

lading_capture/src/manager/state_machine.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,39 @@ impl<F: OutputFormat, C: Clock> StateMachine<F, C> {
258258
Ok(Operation::Exit)
259259
}
260260

261+
/// Replace the current format with a new one, returning the old format.
262+
///
263+
/// This method flushes any buffered data before returning the old format.
264+
/// The caller is responsible for closing the old format (to write any
265+
/// footer/metadata) and providing a properly initialized new format.
266+
///
267+
/// This enables file rotation: the caller can close the old format (writing
268+
/// the Parquet footer), create a new file, and provide the new format.
269+
///
270+
/// # Errors
271+
///
272+
/// Returns an error if flushing the current format fails.
273+
///
274+
/// # Panics
275+
///
276+
/// Panics if called when no format is present (after shutdown).
277+
pub(crate) fn replace_format(&mut self, new_format: F) -> Result<F, Error> {
278+
// Flush any buffered data in the current format
279+
self.format
280+
.as_mut()
281+
.expect("format must be present during operation")
282+
.flush()?;
283+
284+
// Swap in the new format and return the old one
285+
let old_format = self
286+
.format
287+
.replace(new_format)
288+
.expect("format must be present during operation");
289+
290+
info!("Format replaced for file rotation");
291+
Ok(old_format)
292+
}
293+
261294
/// Convert an Instant timestamp to `Accumulator` logical tick time.
262295
#[inline]
263296
fn instant_to_tick(&self, timestamp: Instant) -> u64 {

0 commit comments

Comments
 (0)