diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 79561a12..dba03715 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -1023,6 +1023,8 @@ struct Configuration { int64_t writer_batch_timeout_ms{100}; // Connect timeout in milliseconds for TCP transport connect uint64_t connect_timeout_ms{120000}; + // Request timeout in milliseconds for individual RPC calls + uint64_t request_timeout_ms{30000}; // Security protocol: "PLAINTEXT" (default, no auth) or "sasl" (SASL auth) std::string security_protocol{"PLAINTEXT"}; // SASL mechanism (only "PLAIN" is supported) diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index 754ed0fd..e8f10259 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -70,6 +70,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { ffi_config.scanner_log_fetch_max_bytes_for_bucket = config.scanner_log_fetch_max_bytes_for_bucket; ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms; ffi_config.connect_timeout_ms = config.connect_timeout_ms; + ffi_config.request_timeout_ms = config.request_timeout_ms; ffi_config.security_protocol = rust::String(config.security_protocol); ffi_config.security_sasl_mechanism = rust::String(config.security_sasl_mechanism); ffi_config.security_sasl_username = rust::String(config.security_sasl_username); diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 8d5153e6..90009848 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -54,6 +54,7 @@ mod ffi { scanner_log_fetch_max_bytes_for_bucket: i32, writer_batch_timeout_ms: i64, connect_timeout_ms: u64, + request_timeout_ms: u64, security_protocol: String, security_sasl_mechanism: String, security_sasl_username: String, @@ -677,6 +678,7 @@ fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult { scanner_log_fetch_wait_max_time_ms: config.scanner_log_fetch_wait_max_time_ms, scanner_log_fetch_max_bytes_for_bucket: config.scanner_log_fetch_max_bytes_for_bucket, connect_timeout_ms: config.connect_timeout_ms, + request_timeout_ms: config.request_timeout_ms, security_protocol: config.security_protocol.to_string(), security_sasl_mechanism: config.security_sasl_mechanism.to_string(), security_sasl_username: config.security_sasl_username.to_string(), diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 20b259e8..fa360e4b 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -174,6 +174,10 @@ class Config: @scanner_log_max_poll_records.setter def scanner_log_max_poll_records(self, num: int) -> None: ... @property + def request_timeout_ms(self) -> int: ... + @request_timeout_ms.setter + def request_timeout_ms(self, timeout: int) -> None: ... + @property def scanner_log_fetch_max_bytes(self) -> int: ... @scanner_log_fetch_max_bytes.setter def scanner_log_fetch_max_bytes(self, bytes: int) -> None: ... diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index fd3c980b..767522d0 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -136,6 +136,11 @@ impl Config { FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) })?; } + "request-timeout" => { + config.request_timeout_ms = value.parse::().map_err(|e| { + FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) + })?; + } "security.protocol" => { config.security_protocol = value; } @@ -308,6 +313,18 @@ impl Config { self.inner.connect_timeout_ms = timeout; } + /// Get the request timeout in milliseconds + #[getter] + fn request_timeout_ms(&self) -> u64 { + self.inner.request_timeout_ms + } + + /// Set the request timeout in milliseconds + #[setter] + fn set_request_timeout_ms(&mut self, timeout: u64) { + self.inner.request_timeout_ms = timeout; + } + /// Get the security protocol #[getter] fn security_protocol(&self) -> String { diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index 2610f6da..7cb04dd7 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -43,6 +43,7 @@ impl FlussConnection { .map_err(|msg| Error::IllegalArgument { message: msg })?; let timeout = Duration::from_millis(arg.connect_timeout_ms); + let request_timeout = Duration::from_millis(arg.request_timeout_ms); let connections = if arg.is_sasl_enabled() { Arc::new( RpcClient::new() @@ -50,10 +51,15 @@ impl FlussConnection { arg.security_sasl_username.clone(), arg.security_sasl_password.clone(), ) - .with_timeout(timeout), + .with_connect_timeout(timeout) + .with_request_timeout(request_timeout), ) } else { - Arc::new(RpcClient::new().with_timeout(timeout)) + Arc::new( + RpcClient::new() + .with_connect_timeout(timeout) + .with_request_timeout(request_timeout), + ) }; let metadata = Metadata::new(arg.bootstrap_servers.as_str(), connections.clone()).await?; diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index e85a4492..90fce2cf 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -35,6 +35,7 @@ const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024 * 1024; const DEFAULT_ACKS: &str = "all"; const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000; +const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000; const DEFAULT_SECURITY_PROTOCOL: &str = "PLAINTEXT"; const DEFAULT_SASL_MECHANISM: &str = "PLAIN"; @@ -125,6 +126,11 @@ pub struct Config { #[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)] pub connect_timeout_ms: u64, + /// Request timeout in milliseconds for individual RPC calls. + /// Default: 30000 (30 seconds). + #[arg(long, default_value_t = DEFAULT_REQUEST_TIMEOUT_MS)] + pub request_timeout_ms: u64, + #[arg(long, default_value_t = String::from(DEFAULT_SECURITY_PROTOCOL))] pub security_protocol: String, @@ -181,6 +187,7 @@ impl std::fmt::Debug for Config { ) .field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms) .field("connect_timeout_ms", &self.connect_timeout_ms) + .field("request_timeout_ms", &self.request_timeout_ms) .field("security_protocol", &self.security_protocol) .field("security_sasl_mechanism", &self.security_sasl_mechanism) .field("security_sasl_username", &self.security_sasl_username) @@ -208,6 +215,7 @@ impl Default for Config { scanner_log_fetch_max_bytes_for_bucket: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET, writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS, connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS, + request_timeout_ms: DEFAULT_REQUEST_TIMEOUT_MS, security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL), security_sasl_mechanism: String::from(DEFAULT_SASL_MECHANISM), security_sasl_username: String::new(), diff --git a/crates/fluss/src/rpc/error.rs b/crates/fluss/src/rpc/error.rs index da3a11e2..85be83ed 100644 --- a/crates/fluss/src/rpc/error.rs +++ b/crates/fluss/src/rpc/error.rs @@ -19,6 +19,7 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; use prost::DecodeError; use std::sync::Arc; +use std::time::Duration; use thiserror::Error; #[derive(Error, Debug)] @@ -51,4 +52,7 @@ pub enum RpcError { api_key: ApiKey, api_version: ApiVersion, }, + + #[error("Request timed out after {timeout:?} for api_key={api_key:?}")] + RequestTimeout { timeout: Duration, api_key: ApiKey }, } diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 13c5d9ca..ae5f2c3b 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -70,7 +70,8 @@ impl fmt::Debug for SaslConfig { pub struct RpcClient { connections: RwLock>, client_id: Arc, - timeout: Option, + connect_timeout: Option, + request_timeout: Option, max_message_size: usize, sasl_config: Option, } @@ -80,14 +81,25 @@ impl RpcClient { RpcClient { connections: Default::default(), client_id: Arc::from(""), - timeout: None, + connect_timeout: None, + request_timeout: None, max_message_size: usize::MAX, sasl_config: None, } } - pub fn with_timeout(mut self, timeout: Duration) -> Self { - self.timeout = Some(timeout); + /// Set the timeout used when establishing TCP connections. + /// + /// Compatibility note: this builder was previously named `with_timeout`. + /// It was renamed to make timeout semantics explicit now that + /// `with_request_timeout` is also available. + pub fn with_connect_timeout(mut self, timeout: Duration) -> Self { + self.connect_timeout = Some(timeout); + self + } + + pub fn with_request_timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = Some(timeout); self } @@ -125,7 +137,7 @@ impl RpcClient { async fn connect(&self, server_node: &ServerNode) -> Result { let url = server_node.url(); - let transport = Transport::connect(&url, self.timeout) + let transport = Transport::connect(&url, self.connect_timeout) .await .map_err(|error| ConnectionError(error.to_string()))?; @@ -133,6 +145,7 @@ impl RpcClient { BufStream::new(transport), self.max_message_size, self.client_id.clone(), + self.request_timeout, ); let connection = ServerConnection::new(messenger); @@ -266,6 +279,12 @@ pub struct ServerConnectionInner { state: Arc>, + /// Per-request timeout applied to the response-wait phase only. + /// The send (write) phase is not covered; a stalled `send_message` can + /// exceed this duration. + /// TODO: Full RPC deadline semantics are a potential future enhancement. + request_timeout: Option, + join_handle: JoinHandle<()>, } @@ -273,7 +292,12 @@ impl ServerConnectionInner where RW: AsyncRead + AsyncWrite + Send + 'static, { - pub fn new(stream: RW, max_message_size: usize, client_id: Arc) -> Self { + pub fn new( + stream: RW, + max_message_size: usize, + client_id: Arc, + request_timeout: Option, + ) -> Self { let (stream_read, stream_write) = tokio::io::split(stream); let state = Arc::new(Mutex::new(ConnectionState::RequestMap(HashMap::default()))); let state_captured = Arc::clone(&state); @@ -303,7 +327,7 @@ where _ => { log::warn!( request_id:% = header.request_id; - "Got response for unknown request", + "Ignoring response for unknown request (likely timed out or cancelled)", ); continue; } @@ -337,6 +361,7 @@ where client_id, request_id: AtomicI32::new(0), state, + request_timeout, join_handle, } } @@ -388,8 +413,29 @@ where self.send_message(buf).await?; _cleanup_on_cancel.message_sent(); - let mut response = rx.await.map_err(|e| Error::UnexpectedError { - message: "Got recvError, some one close the channel".to_string(), + let recv_result = match self.request_timeout { + Some(timeout) => match tokio::time::timeout(timeout, rx).await { + Ok(result) => result, + Err(_elapsed) => { + if let ConnectionState::RequestMap(map) = self.state.lock().deref_mut() { + map.remove(&request_id); + } + return Err(RpcError::RequestTimeout { + timeout, + api_key: R::API_KEY, + } + .into()); + } + }, + None => rx.await, + }; + + let mut response = recv_result.map_err(|e| Error::UnexpectedError { + message: format!( + "Response channel closed for request_id={request_id} api_key={:?}; \ + connection may be closed or poisoned", + R::API_KEY + ), source: Some(Box::new(e)), })??; @@ -561,3 +607,72 @@ impl Drop for CleanupRequestStateOnCancel { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::TablePath; + use crate::rpc::message::TableExistsRequest; + + #[tokio::test] + async fn test_request_timeout() { + // Create a duplex stream where the "server" side never responds. + let (client_stream, _server_stream) = tokio::io::duplex(1024); + + let conn = ServerConnectionInner::new( + BufStream::new(client_stream), + usize::MAX, + Arc::from("test"), + Some(Duration::from_millis(500)), + ); + + let table_path = TablePath::new("db", "table"); + let request = TableExistsRequest::new(&table_path); + let result = conn.request(request).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + matches!( + err, + Error::RpcError { + source: RpcError::RequestTimeout { .. }, + .. + } + ), + "expected RequestTimeout, got: {err}" + ); + + // Timeout must not poison the connection — other requests should still work. + assert!(!conn.is_poisoned()); + + // The timed-out request must be removed from the request map (no state leak). + if let ConnectionState::RequestMap(map) = conn.state.lock().deref_mut() { + assert!(map.is_empty(), "request map should be empty after timeout"); + } else { + panic!("connection should not be poisoned after a timeout"); + } + } + + #[tokio::test] + async fn test_request_no_timeout() { + // With no request timeout configured, request should remain pending + // when the server does not respond. + let (client_stream, _server_stream) = tokio::io::duplex(1024); + + let conn = ServerConnectionInner::new( + BufStream::new(client_stream), + usize::MAX, + Arc::from("test"), + None, + ); + + let table_path = TablePath::new("db", "table"); + let request = TableExistsRequest::new(&table_path); + let pending = tokio::time::timeout(Duration::from_millis(300), conn.request(request)).await; + assert!( + pending.is_err(), + "expected request to remain pending without per-request timeout", + ); + } +} diff --git a/website/docs/user-guide/cpp/api-reference.md b/website/docs/user-guide/cpp/api-reference.md index d14cf16d..b4b25126 100644 --- a/website/docs/user-guide/cpp/api-reference.md +++ b/website/docs/user-guide/cpp/api-reference.md @@ -33,6 +33,7 @@ Complete API reference for the Fluss C++ client. | `scanner_log_fetch_wait_max_time_ms` | `int32_t` | `500` | Maximum time (ms) the server may wait to satisfy min-bytes | | `scanner_log_fetch_max_bytes_for_bucket`| `int32_t` | `1048576` (1 MB) | Maximum bytes per fetch response per bucket for LogScanner | | `connect_timeout_ms` | `uint64_t` | `120000` | TCP connect timeout in milliseconds | +| `request_timeout_ms` | `uint64_t` | `30000` | Timeout in ms while waiting for an RPC response after the request is sent (request write/send can take longer) | | `security_protocol` | `std::string` | `"PLAINTEXT"` | `"PLAINTEXT"` (default) or `"sasl"` for SASL auth | | `security_sasl_mechanism` | `std::string` | `"PLAIN"` | SASL mechanism (only `"PLAIN"` is supported) | | `security_sasl_username` | `std::string` | (empty) | SASL username (required when protocol is `"sasl"`) | diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index a4b594bc..2378f824 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -26,6 +26,7 @@ Complete API reference for the Fluss Python client. | `scanner_log_fetch_wait_max_time_ms` | `scanner.log.fetch.wait-max-time-ms` | Get/set maximum time (ms) the server may wait to satisfy min-bytes | | `scanner_log_fetch_max_bytes_for_bucket` | `scanner.log.fetch.max-bytes-for-bucket` | Get/set maximum bytes per fetch response per bucket for LogScanner | | `connect_timeout_ms` | `connect-timeout` | Get/set TCP connect timeout in milliseconds | +| `request_timeout_ms` | `request-timeout` | Get/set max time in ms to wait for an RPC response after the request is sent (does not limit request write/send time) | | `security_protocol` | `security.protocol` | Get/set security protocol (`"PLAINTEXT"` or `"sasl"`) | | `security_sasl_mechanism` | `security.sasl.mechanism` | Get/set SASL mechanism (only `"PLAIN"` is supported) | | `security_sasl_username` | `security.sasl.username` | Get/set SASL username (required when protocol is `"sasl"`) | diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 7f455226..ae5f4ed5 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -5,6 +5,10 @@ sidebar_position: 2 Complete API reference for the Fluss Rust client. +> Compatibility note: `RpcClient::with_timeout` was renamed to +> `RpcClient::with_connect_timeout` to distinguish it from +> `RpcClient::with_request_timeout`. + ## `Config` | Field | Type | Default | Description | @@ -25,6 +29,7 @@ Complete API reference for the Fluss Rust client. | `scanner_log_fetch_wait_max_time_ms` | `i32` | `500` | Maximum time (ms) the server may wait to satisfy min-bytes | | `scanner_log_fetch_max_bytes_for_bucket`| `i32` | `1048576` (1 MB) | Maximum bytes per fetch response per bucket for LogScanner | | `connect_timeout_ms` | `u64` | `120000` | TCP connect timeout in milliseconds | +| `request_timeout_ms` | `u64` | `30000` | Timeout in ms while waiting for an RPC response after the request is sent (request write/send can take longer) | | `security_protocol` | `String` | `"PLAINTEXT"` | `PLAINTEXT` (default) or `sasl` for SASL auth | | `security_sasl_mechanism` | `String` | `"PLAIN"` | SASL mechanism (only `PLAIN` is supported) | | `security_sasl_username` | `String` | (empty) | SASL username (required when protocol is `sasl`) |