-
Notifications
You must be signed in to change notification settings - Fork 39
feat: Add request timeout for rpc #399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
652c72e
d14b7a3
463ef92
2a71f01
2e3b23c
2dcf00d
40e0346
6fca964
0e446f8
cf6dd58
6620fa0
550bdd2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,6 +71,7 @@ pub struct RpcClient { | |
| connections: RwLock<HashMap<String, ServerConnection>>, | ||
| client_id: Arc<str>, | ||
| timeout: Option<Duration>, | ||
| request_timeout: Option<Duration>, | ||
| max_message_size: usize, | ||
| sasl_config: Option<SaslConfig>, | ||
| } | ||
|
|
@@ -81,6 +82,7 @@ impl RpcClient { | |
| connections: Default::default(), | ||
| client_id: Arc::from(""), | ||
| timeout: None, | ||
| request_timeout: None, | ||
| max_message_size: usize::MAX, | ||
| sasl_config: None, | ||
| } | ||
|
|
@@ -91,6 +93,11 @@ impl RpcClient { | |
| self | ||
| } | ||
|
|
||
| pub fn with_request_timeout(mut self, timeout: Duration) -> Self { | ||
| self.request_timeout = Some(timeout); | ||
| self | ||
|
charlesdong1991 marked this conversation as resolved.
|
||
| } | ||
|
|
||
| pub fn with_sasl(mut self, username: String, password: String) -> Self { | ||
| self.sasl_config = Some(SaslConfig { username, password }); | ||
| self | ||
|
|
@@ -133,6 +140,7 @@ impl RpcClient { | |
| BufStream::new(transport), | ||
| self.max_message_size, | ||
| self.client_id.clone(), | ||
| self.request_timeout, | ||
| ); | ||
| let connection = ServerConnection::new(messenger); | ||
|
|
||
|
|
@@ -266,14 +274,25 @@ pub struct ServerConnectionInner<RW> { | |
|
|
||
| state: Arc<Mutex<ConnectionState>>, | ||
|
|
||
| /// Per-request timeout applied to the response-wait phase only. | ||
| /// The send (write) phase is not covered; a stalled `send_message` can | ||
| /// exceed this duration. | ||
| /// TODO: Full RPC deadline semantics are a potential future enhancement. | ||
| request_timeout: Option<Duration>, | ||
|
|
||
| join_handle: JoinHandle<()>, | ||
| } | ||
|
|
||
| impl<RW> ServerConnectionInner<RW> | ||
| where | ||
| RW: AsyncRead + AsyncWrite + Send + 'static, | ||
| { | ||
| pub fn new(stream: RW, max_message_size: usize, client_id: Arc<str>) -> Self { | ||
| pub fn new( | ||
| stream: RW, | ||
| max_message_size: usize, | ||
| client_id: Arc<str>, | ||
| request_timeout: Option<Duration>, | ||
| ) -> Self { | ||
| let (stream_read, stream_write) = tokio::io::split(stream); | ||
| let state = Arc::new(Mutex::new(ConnectionState::RequestMap(HashMap::default()))); | ||
| let state_captured = Arc::clone(&state); | ||
|
|
@@ -301,9 +320,9 @@ where | |
| match map.remove(&header.request_id) { | ||
| Some(active_request) => active_request, | ||
| _ => { | ||
| log::warn!( | ||
| log::debug!( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: why change to debug?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think debug fits better here: because this is a normal late-response race after timeout/cancel, other than an operational warning IMHO so in production environment, we probably will use metric/counter which probably better than warn spamming. WDYT? @luoyuxia
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me know what you think of this, in the meantime, i rebased and resolved conflicts, thanks a lot! @luoyuxia 🙏
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree this is a normal late-response race. However, a request timeout still suggests some underlying issue or risk that users should be aware of. Metrics would help, but logs are often more immediately useful in practice. Fluss's Java client logs this as warn. I'm not sure whether Kafka client does the same for a similar case, but Kafka client may be a better reference if we want to follow common client behavior. So I’m slightly leaning toward keeping warn for now.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for response @luoyuxia i checked Kafka client, it seems Kafka closes connection on timeout, so IIUC late responses can never arrive, aka, Kafka ignores response for request after timeout. But it seems it treats as so i think here i will switch to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated! PTAL @luoyuxia thank you! |
||
| request_id:% = header.request_id; | ||
| "Got response for unknown request", | ||
| "Ignoring response for unknown request (likely timed out or cancelled)", | ||
| ); | ||
| continue; | ||
| } | ||
|
|
@@ -337,6 +356,7 @@ where | |
| client_id, | ||
| request_id: AtomicI32::new(0), | ||
| state, | ||
| request_timeout, | ||
| join_handle, | ||
| } | ||
| } | ||
|
|
@@ -388,10 +408,36 @@ where | |
|
|
||
| self.send_message(buf).await?; | ||
| _cleanup_on_cancel.message_sent(); | ||
| let mut response = rx.await.map_err(|e| Error::UnexpectedError { | ||
| message: "Got recvError, some one close the channel".to_string(), | ||
| source: Some(Box::new(e)), | ||
| })??; | ||
| let mut response = match self.request_timeout { | ||
| Some(timeout) => match tokio::time::timeout(timeout, rx).await { | ||
| Ok(result) => result.map_err(|e| Error::UnexpectedError { | ||
| message: format!( | ||
| "Response channel closed for request_id={request_id} api_key={:?}; \ | ||
| connection may be closed or poisoned", | ||
| R::API_KEY | ||
| ), | ||
| source: Some(Box::new(e)), | ||
| })??, | ||
| Err(_elapsed) => { | ||
| if let ConnectionState::RequestMap(map) = self.state.lock().deref_mut() { | ||
| map.remove(&request_id); | ||
| } | ||
| return Err(RpcError::RequestTimeout { | ||
| timeout, | ||
| api_key: R::API_KEY, | ||
| } | ||
| .into()); | ||
|
charlesdong1991 marked this conversation as resolved.
|
||
| } | ||
| }, | ||
| None => rx.await.map_err(|e| Error::UnexpectedError { | ||
|
charlesdong1991 marked this conversation as resolved.
Outdated
|
||
| message: format!( | ||
| "Response channel closed for request_id={request_id} api_key={:?}; \ | ||
| connection may be closed or poisoned", | ||
| R::API_KEY | ||
| ), | ||
| source: Some(Box::new(e)), | ||
| })??, | ||
| }; | ||
|
|
||
| if let Some(error_response) = response.header.error_response { | ||
| return Err(Error::FlussAPIError { | ||
|
|
@@ -561,3 +607,72 @@ impl Drop for CleanupRequestStateOnCancel { | |
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::metadata::TablePath; | ||
| use crate::rpc::message::TableExistsRequest; | ||
|
|
||
| #[tokio::test] | ||
| async fn test_request_timeout() { | ||
| // Create a duplex stream where the "server" side never responds. | ||
| let (client_stream, _server_stream) = tokio::io::duplex(1024); | ||
|
|
||
| let conn = ServerConnectionInner::new( | ||
| BufStream::new(client_stream), | ||
| usize::MAX, | ||
| Arc::from("test"), | ||
| Some(Duration::from_millis(50)), | ||
| ); | ||
|
charlesdong1991 marked this conversation as resolved.
Outdated
|
||
|
|
||
| let table_path = TablePath::new("db", "table"); | ||
| let request = TableExistsRequest::new(&table_path); | ||
| let result = conn.request(request).await; | ||
|
|
||
| assert!(result.is_err()); | ||
| let err = result.unwrap_err(); | ||
| assert!( | ||
| matches!( | ||
| err, | ||
| Error::RpcError { | ||
| source: RpcError::RequestTimeout { .. }, | ||
| .. | ||
| } | ||
| ), | ||
| "expected RequestTimeout, got: {err}" | ||
| ); | ||
|
|
||
| // Timeout must not poison the connection — other requests should still work. | ||
| assert!(!conn.is_poisoned()); | ||
|
|
||
| // The timed-out request must be removed from the request map (no state leak). | ||
| if let ConnectionState::RequestMap(map) = conn.state.lock().deref_mut() { | ||
| assert!(map.is_empty(), "request map should be empty after timeout"); | ||
| } else { | ||
| panic!("connection should not be poisoned after a timeout"); | ||
| } | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_request_no_timeout() { | ||
| // With no request timeout configured, request should remain pending | ||
| // when the server does not respond. | ||
| let (client_stream, _server_stream) = tokio::io::duplex(1024); | ||
|
|
||
| let conn = ServerConnectionInner::new( | ||
| BufStream::new(client_stream), | ||
| usize::MAX, | ||
| Arc::from("test"), | ||
| None, | ||
| ); | ||
|
|
||
| let table_path = TablePath::new("db", "table"); | ||
| let request = TableExistsRequest::new(&table_path); | ||
| let pending = tokio::time::timeout(Duration::from_millis(50), conn.request(request)).await; | ||
| assert!( | ||
| pending.is_err(), | ||
| "expected request to remain pending without per-request timeout" | ||
| ); | ||
|
charlesdong1991 marked this conversation as resolved.
Outdated
|
||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.