Skip to content

Commit 4aba943

Browse files
committed
flush every
1 parent 1692f26 commit 4aba943

2 files changed

Lines changed: 71 additions & 8 deletions

File tree

lading/src/generator/file_gen/logrotate.rs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,14 @@ use tokio::{
3030
io::{AsyncWriteExt, BufWriter},
3131
task::{JoinError, JoinHandle},
3232
};
33-
use tracing::{error, info};
33+
use tracing::{error, info, warn};
3434

3535
use lading_payload::block;
3636

3737
use super::General;
3838
use crate::generator::common::{
39-
BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle,
39+
BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, ThrottleMode,
40+
create_throttle,
4041
};
4142

4243
/// An enum to allow us to determine what operation caused an IO errror as the
@@ -150,6 +151,14 @@ pub struct Config {
150151
/// Throughput profile controlling emission rate (bytes or blocks).
151152
#[serde(default)]
152153
pub throttle: Option<ThrottleConfig>,
154+
/// Force flush every n blocks. This generator uses a `BufWriter` with capacity
155+
/// based on the throttle's maximum capacity. So when the block cache outputs data roughly
156+
/// equal to the throttle's max rate, the `BufWriter` will flush roughly every second.
157+
/// However when blocks are small relative to the maximum possible rate, the `BufWriter`
158+
/// will flush less frequently. This setting allows you to force a flush after writing to N blocks
159+
/// to create a more consistent flush interval.
160+
#[serde(default)]
161+
pub flush_every_n_blocks: Option<NonZeroU32>,
153162
}
154163

155164
#[derive(Debug)]
@@ -240,6 +249,7 @@ impl Server {
240249
throughput_throttle,
241250
shutdown.clone(),
242251
child_labels,
252+
config.flush_every_n_blocks,
243253
);
244254

245255
handles.push(tokio::spawn(child.spin()));
@@ -290,6 +300,7 @@ struct Child {
290300
throttle: BlockThrottle,
291301
shutdown: lading_signal::Watcher,
292302
labels: Vec<(String, String)>,
303+
flush_every_n_blocks: Option<NonZeroU32>,
293304
}
294305

295306
impl Child {
@@ -303,6 +314,7 @@ impl Child {
303314
throttle: BlockThrottle,
304315
shutdown: lading_signal::Watcher,
305316
labels: Vec<(String, String)>,
317+
flush_every_n_blocks: Option<NonZeroU32>,
306318
) -> Self {
307319
let mut names = Vec::with_capacity((total_rotations + 1).into());
308320
names.push(PathBuf::from(basename));
@@ -325,16 +337,29 @@ impl Child {
325337
throttle,
326338
shutdown,
327339
labels,
340+
flush_every_n_blocks,
328341
}
329342
}
330343

331344
async fn spin(mut self) -> Result<(), Error> {
332345
let mut handle = self.block_cache.handle();
346+
// Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity
347+
// (converted to bytes if necessary) to approximate flush every second (ASSUMING that throttler will write
348+
// bytes at the throttle's maximum byte rate. When using a block throttler with blocks smaller than
349+
// the throttle's maximum block size, this will flush less frequently and thus it's recommended to use
350+
// the flush_every_n_blocks setting).
351+
if self.throttle.mode == ThrottleMode::Blocks && self.flush_every_n_blocks.is_none() {
352+
warn!(
353+
"BufWriter flush frequency can be inconsistent when using block-based throttling - \
354+
consider setting flush_every_n_blocks in your generator config"
355+
);
356+
}
333357
let buffer_capacity = self
334358
.throttle
335359
.maximum_capacity_bytes(self.maximum_block_size);
336360
let mut total_bytes_written: u64 = 0;
337361
let maximum_bytes_per_log: u64 = u64::from(self.maximum_bytes_per_log.get());
362+
let mut blocks_since_flush: u32 = 0;
338363

339364
let total_names = self.names.len();
340365
// SAFETY: By construction there is guaranteed to be at least one name.
@@ -374,7 +399,7 @@ impl Child {
374399
result = self.throttle.wait_for_block(&self.block_cache, &handle) => {
375400
match result {
376401
Ok(()) => {
377-
write_bytes(self.block_cache.advance(&mut handle),
402+
let did_rotate = write_bytes(self.block_cache.advance(&mut handle),
378403
&mut fp,
379404
&mut total_bytes_written,
380405
buffer_capacity,
@@ -383,6 +408,15 @@ impl Child {
383408
&self.names,
384409
last_name,
385410
&self.labels).await?;
411+
if did_rotate {
412+
blocks_since_flush = 0;
413+
} else {
414+
blocks_since_flush += 1;
415+
if self.flush_every_n_blocks.is_some_and(|n| blocks_since_flush == n.get()) {
416+
fp.flush().await.map_err(|err| Error::IoFlush { err })?;
417+
blocks_since_flush = 0;
418+
}
419+
}
386420
}
387421
Err(err) => {
388422
error!("Discarding block due to throttle error: {err}");
@@ -401,6 +435,8 @@ impl Child {
401435
}
402436
}
403437

438+
/// Writes a block to the file, rotating if necessary.
439+
/// Returns `true` if a rotation occurred (and thus the file was flushed).
404440
#[allow(clippy::too_many_arguments)]
405441
async fn write_bytes(
406442
blk: &block::Block,
@@ -412,7 +448,7 @@ async fn write_bytes(
412448
names: &[PathBuf],
413449
last_name: &Path,
414450
labels: &[(String, String)],
415-
) -> Result<(), Error> {
451+
) -> Result<bool, Error> {
416452
let total_bytes = u64::from(blk.total_bytes.get());
417453

418454
{
@@ -479,7 +515,8 @@ async fn write_bytes(
479515
})?,
480516
);
481517
*total_bytes_written = 0;
518+
return Ok(true);
482519
}
483520

484-
Ok(())
521+
Ok(false)
485522
}

lading/src/generator/file_gen/traditional.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,14 @@ use tokio::{
3131
io::{AsyncWriteExt, BufWriter},
3232
task::{JoinError, JoinSet},
3333
};
34-
use tracing::{error, info};
34+
use tracing::{error, info, warn};
3535

3636
use lading_payload::{self, block};
3737

3838
use super::General;
3939
use crate::generator::common::{
40-
BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle,
40+
BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, ThrottleMode,
41+
create_throttle,
4142
};
4243

4344
#[derive(thiserror::Error, Debug)]
@@ -121,6 +122,14 @@ pub struct Config {
121122
rotate: bool,
122123
/// The load throttle configuration
123124
pub throttle: Option<ThrottleConfig>,
125+
/// Force flush every n blocks. This generator uses a `BufWriter` with capacity
126+
/// based on the throttle's maximum capacity. So when the block cache outputs data roughly
127+
/// equal to the throttle's max rate, the `BufWriter` will flush roughly every second.
128+
/// However when blocks are small relative to the maximum possible rate, the `BufWriter`
129+
/// will flush less frequently. This setting allows you to force a flush after writing to N blocks
130+
/// to create a more consistent flush interval.
131+
#[serde(default)]
132+
pub flush_every_n_blocks: Option<NonZeroU32>,
124133
}
125134

126135
#[derive(Debug)]
@@ -200,6 +209,7 @@ impl Server {
200209
file_index: Arc::clone(&file_index),
201210
rotate: config.rotate,
202211
shutdown: shutdown.clone(),
212+
flush_every_n_blocks: config.flush_every_n_blocks,
203213
};
204214

205215
handles.spawn(child.spin());
@@ -275,6 +285,7 @@ struct Child {
275285
rotate: bool,
276286
file_index: Arc<AtomicU32>,
277287
shutdown: lading_signal::Watcher,
288+
flush_every_n_blocks: Option<NonZeroU32>,
278289
}
279290

280291
impl Child {
@@ -288,7 +299,16 @@ impl Child {
288299

289300
let mut handle = self.block_cache.handle();
290301
// Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity
291-
// (converted to bytes if necessary) to approximate flush every second.
302+
// (converted to bytes if necessary) to approximate flush every second (ASSUMING that throttler will write
303+
// bytes at the throttle's maximum byte rate. When using a block throttler with blocks smaller than
304+
// the throttle's maximum block size, this will flush less frequently and thus it's reccomended to use
305+
// the flush_every_n_blocks setting).
306+
if self.throttle.mode == ThrottleMode::Blocks && self.flush_every_n_blocks.is_none() {
307+
warn!(
308+
"BufWriter flush frequency can be inconsistent when using block-based throttling -
309+
consider setting flush_every_n_blocks in your generator config"
310+
);
311+
}
292312
let buffer_capacity = self
293313
.throttle
294314
.maximum_capacity_bytes(self.maximum_block_size);
@@ -312,6 +332,7 @@ impl Child {
312332
);
313333
let shutdown_wait = self.shutdown.recv();
314334
tokio::pin!(shutdown_wait);
335+
let mut blocks_since_flush = 0;
315336
loop {
316337
tokio::select! {
317338
result = self.throttle.wait_for_block(&self.block_cache, &handle) => {
@@ -324,10 +345,12 @@ impl Child {
324345
fp.write_all(&block.bytes).await?;
325346
counter!("bytes_written").increment(total_bytes);
326347
total_bytes_written += total_bytes;
348+
blocks_since_flush += 1;
327349
}
328350

329351
if total_bytes_written > maximum_bytes_per_file {
330352
fp.flush().await?;
353+
blocks_since_flush = 0;
331354
if self.rotate {
332355
// Delete file, leaving any open file handlers intact. This
333356
// includes our own `fp` for the time being.
@@ -355,6 +378,9 @@ impl Child {
355378
})?,
356379
);
357380
total_bytes_written = 0;
381+
} else if self.flush_every_n_blocks.is_some_and(|n| blocks_since_flush == n.get()) {
382+
fp.flush().await?;
383+
blocks_since_flush = 0;
358384
}
359385
}
360386
Err(err) => {

0 commit comments

Comments
 (0)