Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ impl FlussConnection {
.map_err(|msg| Error::IllegalArgument { message: msg })?;
arg.validate_scanner_fetch()
.map_err(|msg| Error::IllegalArgument { message: msg })?;
arg.validate_numeric_fields()
.map_err(|msg| Error::IllegalArgument { message: msg })?;

let timeout = Duration::from_millis(arg.connect_timeout_ms);
let connections = if arg.is_sasl_enabled() {
Expand Down
128 changes: 128 additions & 0 deletions crates/fluss/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,39 @@ impl Config {
}
Ok(())
}
pub fn validate_numeric_fields(&self) -> Result<(), String> {
if self.writer_request_max_size <= 0 {
return Err("writer_request_max_size must be > 0".to_string());
}
if self.writer_batch_size <= 0 {
return Err("writer_batch_size must be > 0".to_string());
}
if self.writer_batch_timeout_ms < 0 {
return Err("writer_batch_timeout_ms must be >= 0".to_string());
}
if self.remote_file_download_thread_num == 0 {
return Err("remote_file_download_thread_num must be > 0".to_string());
}
if self.scanner_remote_log_prefetch_num == 0 {
return Err("scanner_remote_log_prefetch_num must be > 0".to_string());
}
if self.scanner_remote_log_read_concurrency == 0 {
return Err("scanner_remote_log_read_concurrency must be > 0".to_string());
}
if self.scanner_log_max_poll_records == 0 {
return Err("scanner_log_max_poll_records must be > 0".to_string());
}
if self.writer_max_inflight_requests_per_bucket == 0 {
return Err("writer_max_inflight_requests_per_bucket must be > 0".to_string());
}
if self.writer_buffer_memory_size == 0 {
return Err("writer_buffer_memory_size must be > 0".to_string());
}
if self.connect_timeout_ms == 0 {
return Err("connect_timeout_ms must be > 0".to_string());
}
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -494,4 +527,99 @@ mod tests {
};
assert!(config.validate_idempotence().is_err());
}
#[test]
fn test_numeric_fields_defaults_valid() {
let config = Config::default();
assert!(config.validate_numeric_fields().is_ok());
}

#[test]
fn test_writer_request_max_size_zero() {
let config = Config {
writer_request_max_size: 0,
..Config::default()
};
assert!(config.validate_numeric_fields().is_err());
}

#[test]
fn test_writer_batch_size_zero() {
let config = Config {
writer_batch_size: 0,
..Config::default()
};
assert!(config.validate_numeric_fields().is_err());
}

#[test]
fn test_writer_batch_timeout_negative() {
let config = Config {
writer_batch_timeout_ms: -1,
..Config::default()
};
assert!(config.validate_numeric_fields().is_err());
}

#[test]
fn test_remote_file_download_thread_num_zero() {
let config = Config {
remote_file_download_thread_num: 0,
..Config::default()
};
assert!(config.validate_numeric_fields().is_err());
}

#[test]
fn test_scanner_remote_log_prefetch_num_zero() {
let config = Config {
scanner_remote_log_prefetch_num: 0,
..Config::default()
};
assert!(config.validate_numeric_fields().is_err());
}

#[test]
fn test_scanner_remote_log_read_concurrency_zero() {
let config = Config {
scanner_remote_log_read_concurrency: 0,
..Config::default()
};
assert!(config.validate_numeric_fields().is_err());
}

#[test]
fn test_scanner_log_max_poll_records_zero() {
let config = Config {
scanner_log_max_poll_records: 0,
..Config::default()
};
assert!(config.validate_numeric_fields().is_err());
}

#[test]
fn test_writer_max_inflight_requests_per_bucket_zero() {
let config = Config {
writer_max_inflight_requests_per_bucket: 0,
..Config::default()
};
assert!(config.validate_numeric_fields().is_err());
}

#[test]
fn test_writer_buffer_memory_size_zero() {
let config = Config {
writer_buffer_memory_size: 0,
..Config::default()
};
assert!(config.validate_numeric_fields().is_err());
}

#[test]
fn test_connect_timeout_ms_zero() {
let config = Config {
connect_timeout_ms: 0,
..Config::default()
};
assert!(config.validate_numeric_fields().is_err());
}
}