-
Notifications
You must be signed in to change notification settings - Fork 39
feat: Add request timeout for rpc #399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 9 commits
652c72e
d14b7a3
463ef92
2a71f01
2e3b23c
2dcf00d
40e0346
6fca964
0e446f8
cf6dd58
6620fa0
550bdd2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,7 +70,8 @@ impl fmt::Debug for SaslConfig { | |
| pub struct RpcClient { | ||
| connections: RwLock<HashMap<String, ServerConnection>>, | ||
| client_id: Arc<str>, | ||
| timeout: Option<Duration>, | ||
| connect_timeout: Option<Duration>, | ||
| request_timeout: Option<Duration>, | ||
| max_message_size: usize, | ||
| sasl_config: Option<SaslConfig>, | ||
| } | ||
|
|
@@ -80,14 +81,25 @@ impl RpcClient { | |
| RpcClient { | ||
| connections: Default::default(), | ||
| client_id: Arc::from(""), | ||
| timeout: None, | ||
| connect_timeout: None, | ||
| request_timeout: None, | ||
| max_message_size: usize::MAX, | ||
| sasl_config: None, | ||
| } | ||
| } | ||
|
|
||
| pub fn with_timeout(mut self, timeout: Duration) -> Self { | ||
| self.timeout = Some(timeout); | ||
| /// Set the timeout used when establishing TCP connections. | ||
| /// | ||
| /// Compatibility note: this builder was previously named `with_timeout`. | ||
| /// It was renamed to make timeout semantics explicit now that | ||
| /// `with_request_timeout` is also available. | ||
| pub fn with_connect_timeout(mut self, timeout: Duration) -> Self { | ||
| self.connect_timeout = Some(timeout); | ||
| self | ||
| } | ||
|
|
||
| pub fn with_request_timeout(mut self, timeout: Duration) -> Self { | ||
| self.request_timeout = Some(timeout); | ||
| self | ||
|
charlesdong1991 marked this conversation as resolved.
|
||
| } | ||
|
|
||
|
|
@@ -125,14 +137,15 @@ impl RpcClient { | |
|
|
||
| async fn connect(&self, server_node: &ServerNode) -> Result<ServerConnection, Error> { | ||
| let url = server_node.url(); | ||
| let transport = Transport::connect(&url, self.timeout) | ||
| let transport = Transport::connect(&url, self.connect_timeout) | ||
| .await | ||
| .map_err(|error| ConnectionError(error.to_string()))?; | ||
|
|
||
| let messenger = ServerConnectionInner::new( | ||
| BufStream::new(transport), | ||
| self.max_message_size, | ||
| self.client_id.clone(), | ||
| self.request_timeout, | ||
| ); | ||
| let connection = ServerConnection::new(messenger); | ||
|
|
||
|
|
@@ -266,14 +279,25 @@ pub struct ServerConnectionInner<RW> { | |
|
|
||
| state: Arc<Mutex<ConnectionState>>, | ||
|
|
||
| /// Per-request timeout applied to the response-wait phase only. | ||
| /// The send (write) phase is not covered; a stalled `send_message` can | ||
| /// exceed this duration. | ||
| /// TODO: Full RPC deadline semantics are a potential future enhancement. | ||
| request_timeout: Option<Duration>, | ||
|
|
||
| join_handle: JoinHandle<()>, | ||
| } | ||
|
|
||
| impl<RW> ServerConnectionInner<RW> | ||
| where | ||
| RW: AsyncRead + AsyncWrite + Send + 'static, | ||
| { | ||
| pub fn new(stream: RW, max_message_size: usize, client_id: Arc<str>) -> Self { | ||
| pub fn new( | ||
| stream: RW, | ||
| max_message_size: usize, | ||
| client_id: Arc<str>, | ||
| request_timeout: Option<Duration>, | ||
| ) -> Self { | ||
| let (stream_read, stream_write) = tokio::io::split(stream); | ||
| let state = Arc::new(Mutex::new(ConnectionState::RequestMap(HashMap::default()))); | ||
| let state_captured = Arc::clone(&state); | ||
|
|
@@ -301,9 +325,9 @@ where | |
| match map.remove(&header.request_id) { | ||
| Some(active_request) => active_request, | ||
| _ => { | ||
| log::warn!( | ||
| log::debug!( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: why change to debug?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think debug fits better here: because this is a normal late-response race after timeout/cancel, other than an operational warning IMHO so in production environment, we probably will use metric/counter which probably better than warn spamming. WDYT? @luoyuxia
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me know what you think of this, in the meantime, i rebased and resolved conflicts, thanks a lot! @luoyuxia 🙏
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree this is a normal late-response race. However, a request timeout still suggests some underlying issue or risk that users should be aware of. Metrics would help, but logs are often more immediately useful in practice. Fluss's Java client logs this as warn. I'm not sure whether Kafka client does the same for a similar case, but Kafka client may be a better reference if we want to follow common client behavior. So I’m slightly leaning toward keeping warn for now.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for response @luoyuxia i checked Kafka client, it seems Kafka closes connection on timeout, so IIUC late responses can never arrive, aka, Kafka ignores response for request after timeout. But it seems it treats as so i think here i will switch to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated! PTAL @luoyuxia thank you! |
||
| request_id:% = header.request_id; | ||
| "Got response for unknown request", | ||
| "Ignoring response for unknown request (likely timed out or cancelled)", | ||
| ); | ||
| continue; | ||
| } | ||
|
|
@@ -337,6 +361,7 @@ where | |
| client_id, | ||
| request_id: AtomicI32::new(0), | ||
| state, | ||
| request_timeout, | ||
| join_handle, | ||
| } | ||
| } | ||
|
|
@@ -388,8 +413,29 @@ where | |
|
|
||
| self.send_message(buf).await?; | ||
| _cleanup_on_cancel.message_sent(); | ||
| let mut response = rx.await.map_err(|e| Error::UnexpectedError { | ||
| message: "Got recvError, some one close the channel".to_string(), | ||
| let recv_result = match self.request_timeout { | ||
| Some(timeout) => match tokio::time::timeout(timeout, rx).await { | ||
| Ok(result) => result, | ||
| Err(_elapsed) => { | ||
| if let ConnectionState::RequestMap(map) = self.state.lock().deref_mut() { | ||
| map.remove(&request_id); | ||
| } | ||
| return Err(RpcError::RequestTimeout { | ||
| timeout, | ||
| api_key: R::API_KEY, | ||
| } | ||
| .into()); | ||
|
charlesdong1991 marked this conversation as resolved.
|
||
| } | ||
| }, | ||
| None => rx.await, | ||
| }; | ||
|
|
||
| let mut response = recv_result.map_err(|e| Error::UnexpectedError { | ||
| message: format!( | ||
| "Response channel closed for request_id={request_id} api_key={:?}; \ | ||
| connection may be closed or poisoned", | ||
| R::API_KEY | ||
| ), | ||
| source: Some(Box::new(e)), | ||
| })??; | ||
|
|
||
|
|
@@ -561,3 +607,72 @@ impl Drop for CleanupRequestStateOnCancel { | |
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::metadata::TablePath; | ||
| use crate::rpc::message::TableExistsRequest; | ||
|
|
||
| #[tokio::test] | ||
| async fn test_request_timeout() { | ||
| // Create a duplex stream where the "server" side never responds. | ||
| let (client_stream, _server_stream) = tokio::io::duplex(1024); | ||
|
|
||
| let conn = ServerConnectionInner::new( | ||
| BufStream::new(client_stream), | ||
| usize::MAX, | ||
| Arc::from("test"), | ||
| Some(Duration::from_millis(500)), | ||
| ); | ||
|
|
||
| let table_path = TablePath::new("db", "table"); | ||
| let request = TableExistsRequest::new(&table_path); | ||
| let result = conn.request(request).await; | ||
|
|
||
| assert!(result.is_err()); | ||
| let err = result.unwrap_err(); | ||
| assert!( | ||
| matches!( | ||
| err, | ||
| Error::RpcError { | ||
| source: RpcError::RequestTimeout { .. }, | ||
| .. | ||
| } | ||
| ), | ||
| "expected RequestTimeout, got: {err}" | ||
| ); | ||
|
|
||
| // Timeout must not poison the connection — other requests should still work. | ||
| assert!(!conn.is_poisoned()); | ||
|
|
||
| // The timed-out request must be removed from the request map (no state leak). | ||
| if let ConnectionState::RequestMap(map) = conn.state.lock().deref_mut() { | ||
| assert!(map.is_empty(), "request map should be empty after timeout"); | ||
| } else { | ||
| panic!("connection should not be poisoned after a timeout"); | ||
| } | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_request_no_timeout() { | ||
| // With no request timeout configured, request should remain pending | ||
| // when the server does not respond. | ||
| let (client_stream, _server_stream) = tokio::io::duplex(1024); | ||
|
|
||
| let conn = ServerConnectionInner::new( | ||
| BufStream::new(client_stream), | ||
| usize::MAX, | ||
| Arc::from("test"), | ||
| None, | ||
| ); | ||
|
|
||
| let table_path = TablePath::new("db", "table"); | ||
| let request = TableExistsRequest::new(&table_path); | ||
| let pending = tokio::time::timeout(Duration::from_millis(300), conn.request(request)).await; | ||
| assert!( | ||
| pending.is_err(), | ||
| "expected request to remain pending without per-request timeout", | ||
| ); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.