diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index 78e9362b..e31185f0 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -45,6 +45,8 @@ impl FlussConnection { .map_err(|msg| Error::IllegalArgument { message: msg })?; arg.validate_scanner_fetch() .map_err(|msg| Error::IllegalArgument { message: msg })?; + arg.validate_numeric_fields() + .map_err(|msg| Error::IllegalArgument { message: msg })?; let timeout = Duration::from_millis(arg.connect_timeout_ms); let connections = if arg.is_sasl_enabled() { diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 32db44f6..2a3507d4 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -350,6 +350,39 @@ impl Config { } Ok(()) } + pub fn validate_numeric_fields(&self) -> Result<(), String> { + if self.writer_request_max_size <= 0 { + return Err("writer_request_max_size must be > 0".to_string()); + } + if self.writer_batch_size <= 0 { + return Err("writer_batch_size must be > 0".to_string()); + } + if self.writer_batch_timeout_ms < 0 { + return Err("writer_batch_timeout_ms must be >= 0".to_string()); + } + if self.remote_file_download_thread_num == 0 { + return Err("remote_file_download_thread_num must be > 0".to_string()); + } + if self.scanner_remote_log_prefetch_num == 0 { + return Err("scanner_remote_log_prefetch_num must be > 0".to_string()); + } + if self.scanner_remote_log_read_concurrency == 0 { + return Err("scanner_remote_log_read_concurrency must be > 0".to_string()); + } + if self.scanner_log_max_poll_records == 0 { + return Err("scanner_log_max_poll_records must be > 0".to_string()); + } + if self.writer_max_inflight_requests_per_bucket == 0 { + return Err("writer_max_inflight_requests_per_bucket must be > 0".to_string()); + } + if self.writer_buffer_memory_size == 0 { + return Err("writer_buffer_memory_size must be > 0".to_string()); + } + if self.connect_timeout_ms == 0 { + return Err("connect_timeout_ms must be > 0".to_string()); + } + Ok(()) + } } #[cfg(test)] @@ -494,4 +527,99 @@ mod tests { }; assert!(config.validate_idempotence().is_err()); } + #[test] + fn test_numeric_fields_defaults_valid() { + let config = Config::default(); + assert!(config.validate_numeric_fields().is_ok()); + } + + #[test] + fn test_writer_request_max_size_zero() { + let config = Config { + writer_request_max_size: 0, + ..Config::default() + }; + assert!(config.validate_numeric_fields().is_err()); + } + + #[test] + fn test_writer_batch_size_zero() { + let config = Config { + writer_batch_size: 0, + ..Config::default() + }; + assert!(config.validate_numeric_fields().is_err()); + } + + #[test] + fn test_writer_batch_timeout_negative() { + let config = Config { + writer_batch_timeout_ms: -1, + ..Config::default() + }; + assert!(config.validate_numeric_fields().is_err()); + } + + #[test] + fn test_remote_file_download_thread_num_zero() { + let config = Config { + remote_file_download_thread_num: 0, + ..Config::default() + }; + assert!(config.validate_numeric_fields().is_err()); + } + + #[test] + fn test_scanner_remote_log_prefetch_num_zero() { + let config = Config { + scanner_remote_log_prefetch_num: 0, + ..Config::default() + }; + assert!(config.validate_numeric_fields().is_err()); + } + + #[test] + fn test_scanner_remote_log_read_concurrency_zero() { + let config = Config { + scanner_remote_log_read_concurrency: 0, + ..Config::default() + }; + assert!(config.validate_numeric_fields().is_err()); + } + + #[test] + fn test_scanner_log_max_poll_records_zero() { + let config = Config { + scanner_log_max_poll_records: 0, + ..Config::default() + }; + assert!(config.validate_numeric_fields().is_err()); + } + + #[test] + fn test_writer_max_inflight_requests_per_bucket_zero() { + let config = Config { + writer_max_inflight_requests_per_bucket: 0, + ..Config::default() + }; + assert!(config.validate_numeric_fields().is_err()); + } + + #[test] + fn test_writer_buffer_memory_size_zero() { + let config = Config { + writer_buffer_memory_size: 0, + ..Config::default() + }; + assert!(config.validate_numeric_fields().is_err()); + } + + #[test] + fn test_connect_timeout_ms_zero() { + let config = Config { + connect_timeout_ms: 0, + ..Config::default() + }; + assert!(config.validate_numeric_fields().is_err()); + } }