Skip to content

Commit a2c8d36

Browse files
committed
flush every
1 parent 9ae8ce6 commit a2c8d36

1 file changed

Lines changed: 19 additions & 1 deletion

File tree

lading/src/generator/file_gen/traditional.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ pub struct Config {
122122
rotate: bool,
123123
/// The load throttle configuration
124124
pub throttle: Option<ThrottleConfig>,
125+
/// Force flush every n blocks. This generator uses a BufWriter with capacity
126+
/// based on the throttle's maximum capacity. So when the block cache outputs data roughly
127+
/// equal to the throttle's max rate, the BufWriter will flush roughly every second.
128+
/// However when blocks are small relative to the maximum possible rate, the BufWriter
129+
/// will flush less frequently. This setting allows you to force a flush after writing to N blocks
130+
/// to create a more consistent flush interval.
131+
#[serde(default)]
132+
pub flush_every_n_blocks: Option<NonZeroU32>,
125133
}
126134

127135
#[derive(Debug)]
@@ -201,6 +209,7 @@ impl Server {
201209
file_index: Arc::clone(&file_index),
202210
rotate: config.rotate,
203211
shutdown: shutdown.clone(),
212+
flush_every_n_blocks: config.flush_every_n_blocks,
204213
};
205214

206215
handles.spawn(child.spin());
@@ -276,6 +285,7 @@ struct Child {
276285
rotate: bool,
277286
file_index: Arc<AtomicU32>,
278287
shutdown: lading_signal::Watcher,
288+
flush_every_n_blocks: Option<NonZeroU32>,
279289
}
280290

281291
impl Child {
@@ -289,7 +299,7 @@ impl Child {
289299

290300
let mut handle = self.block_cache.handle();
291301
// Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity
292-
// (converted to bytes if necessary) to approximate flush every second.
302+
// (converted to bytes if necessary) to approximate flush every second. HOWEVER, this
293303
let buffer_capacity = match self.throttle.mode {
294304
ThrottleMode::Bytes => self.throttle.maximum_capacity() as usize,
295305
ThrottleMode::Blocks => usize::try_from(
@@ -319,6 +329,7 @@ impl Child {
319329
);
320330
let shutdown_wait = self.shutdown.recv();
321331
tokio::pin!(shutdown_wait);
332+
let mut blocks_since_flush = 0;
322333
loop {
323334
tokio::select! {
324335
result = self.throttle.wait_for_block(&self.block_cache, &handle) => {
@@ -331,10 +342,12 @@ impl Child {
331342
fp.write_all(&block.bytes).await?;
332343
counter!("bytes_written").increment(total_bytes);
333344
total_bytes_written += total_bytes;
345+
blocks_since_flush += 1;
334346
}
335347

336348
if total_bytes_written > maximum_bytes_per_file {
337349
fp.flush().await?;
350+
blocks_since_flush = 0;
338351
if self.rotate {
339352
// Delete file, leaving any open file handlers intact. This
340353
// includes our own `fp` for the time being.
@@ -362,6 +375,11 @@ impl Child {
362375
})?,
363376
);
364377
total_bytes_written = 0;
378+
} else if let Some(flush_every_n_blocks) = self.flush_every_n_blocks {
379+
if blocks_since_flush == u32::from(flush_every_n_blocks.get()) {
380+
fp.flush().await?;
381+
blocks_since_flush = 0;
382+
}
365383
}
366384
}
367385
Err(err) => {

0 commit comments

Comments
 (0)