From 5086145c8a3a5058e9780d4ce5364a95ef7d7a0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20=C5=81abor?= Date: Tue, 9 Jun 2026 12:27:08 -0400 Subject: [PATCH 1/4] refactor(client): concrete future types for `send_request` methods --- src/client/conn/http1.rs | 147 ++++++++++++++++++++++++++------------ src/client/conn/http2.rs | 148 +++++++++++++++++++++++++++------------ 2 files changed, 208 insertions(+), 87 deletions(-) diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index b78757b9ca..485596d54c 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -11,6 +11,7 @@ use bytes::Bytes; use futures_core::ready; use http::{Request, Response}; use httparse::ParserConfig; +use tokio::sync::oneshot::Receiver; use super::super::dispatch::{self, TrySendError}; use crate::body::{Body, Incoming as IncomingBody}; @@ -210,26 +211,12 @@ where /// hyper closes the underlying connection when a request future is /// dropped before completion. Any subsequent calls on the same /// [`SendRequest`] will return a `canceled` error. - pub fn send_request( - &mut self, - req: Request, - ) -> impl Future>> { - let sent = self.dispatch.send(req); - - async move { - match sent { - Ok(rx) => match rx.await { - Ok(Ok(resp)) => Ok(resp), - Ok(Err(err)) => Err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_canceled) => panic!("dispatch dropped without returning error"), - }, - Err(_req) => { - debug!("connection was not ready"); - Err(crate::Error::new_canceled().with("connection was not ready")) - } - } - } + pub fn send_request(&mut self, req: Request) -> SendFut { + let state = match self.dispatch.send(req) { + Ok(rx) => SendFutState::Fut { rx }, + Err(req) => SendFutState::Err { req }, + }; + SendFut { state } } /// Sends a `Request` on the associated connection. @@ -240,29 +227,12 @@ where /// /// If there was an error before trying to serialize the request to the /// connection, the message will be returned as part of this error. - pub fn try_send_request( - &mut self, - req: Request, - ) -> impl Future, TrySendError>>> { - let sent = self.dispatch.try_send(req); - async move { - match sent { - Ok(rx) => match rx.await { - Ok(Ok(res)) => Ok(res), - Ok(Err(err)) => Err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_) => panic!("dispatch dropped without returning error"), - }, - Err(req) => { - debug!("connection was not ready"); - let error = crate::Error::new_canceled().with("connection was not ready"); - Err(TrySendError { - error, - message: Some(req), - }) - } - } - } + pub fn try_send_request(&mut self, req: Request) -> TrySendFut { + let state = match self.dispatch.try_send(req) { + Ok(rx) => TrySendFutState::Fut { rx }, + Err(req) => TrySendFutState::Err { req: Some(req) }, + }; + TrySendFut { state } } } @@ -272,6 +242,97 @@ impl fmt::Debug for SendRequest { } } +enum SendFutState { + Fut { + rx: Receiver>>, + }, + Err { + req: Request, + }, +} + +/// Future returned by [`SendRequest::send_request`], see the method definition +/// for more details. +pub struct SendFut { + state: SendFutState, +} + +impl fmt::Debug for SendFut { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SendFut").finish() + } +} + +// `SendFut` never structurally pins any of its fields +impl Unpin for SendFut {} + +impl Future for SendFut { + type Output = crate::Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = match &mut self.state { + SendFutState::Fut { rx } => match ready!(Pin::new(rx).poll(cx)) { + Ok(Ok(resp)) => Ok(resp), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + }, + SendFutState::Err { req: _req } => { + debug!("connection was not ready"); + Err(crate::Error::new_canceled().with("connection was not ready")) + } + }; + Poll::Ready(res) + } +} + +enum TrySendFutState { + Fut { + rx: Receiver, TrySendError>>>, + }, + Err { + req: Option>, + }, +} + +/// Future returned by [`SendRequest::try_send_request`], see the method +/// definition for more details. +pub struct TrySendFut { + state: TrySendFutState, +} + +impl fmt::Debug for TrySendFut { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TrySendFut").finish() + } +} + +// `TrySendFut` never structurally pins any of its fields +impl Unpin for TrySendFut {} + +impl Future for TrySendFut { + type Output = Result, TrySendError>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = match &mut self.state { + TrySendFutState::Fut { rx } => match ready!(Pin::new(rx).poll(cx)) { + Ok(Ok(resp)) => Ok(resp), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + }, + TrySendFutState::Err { req } => { + debug!("connection was not ready"); + Err(TrySendError { + error: crate::Error::new_canceled().with("connection was not ready"), + message: Some(req.take().expect("future already resolved")), + }) + } + }; + Poll::Ready(res) + } +} + // ===== impl Connection impl Connection diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 5ffad524be..6db6bdf846 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -12,6 +12,7 @@ use std::time::Duration; use crate::rt::{Read, Write}; use futures_core::ready; use http::{Request, Response}; +use tokio::sync::oneshot::Receiver; use super::super::dispatch::{self, TrySendError}; use crate::body::{Body, Incoming as IncomingBody}; @@ -147,27 +148,12 @@ where /// other in-flight and future requests. The peer is notified /// immediately rather than continuing to send a response body that /// would be discarded. - pub fn send_request( - &mut self, - req: Request, - ) -> impl Future>> { - let sent = self.dispatch.send(req); - - async move { - match sent { - Ok(rx) => match rx.await { - Ok(Ok(resp)) => Ok(resp), - Ok(Err(err)) => Err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_canceled) => panic!("dispatch dropped without returning error"), - }, - Err(_req) => { - debug!("connection was not ready"); - - Err(crate::Error::new_canceled().with("connection was not ready")) - } - } - } + pub fn send_request(&mut self, req: Request) -> SendFut { + let state = match self.dispatch.send(req) { + Ok(rx) => SendFutState::Fut { rx }, + Err(req) => SendFutState::Err { req }, + }; + SendFut { state } } /// Sends a `Request` on the associated connection. @@ -178,29 +164,12 @@ where /// /// If there was an error before trying to serialize the request to the /// connection, the message will be returned as part of this error. - pub fn try_send_request( - &mut self, - req: Request, - ) -> impl Future, TrySendError>>> { - let sent = self.dispatch.try_send(req); - async move { - match sent { - Ok(rx) => match rx.await { - Ok(Ok(res)) => Ok(res), - Ok(Err(err)) => Err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_) => panic!("dispatch dropped without returning error"), - }, - Err(req) => { - debug!("connection was not ready"); - let error = crate::Error::new_canceled().with("connection was not ready"); - Err(TrySendError { - error, - message: Some(req), - }) - } - } - } + pub fn try_send_request(&mut self, req: Request) -> TrySendFut { + let state = match self.dispatch.try_send(req) { + Ok(rx) => TrySendFutState::Fut { rx }, + Err(req) => TrySendFutState::Err { req: Some(req) }, + }; + TrySendFut { state } } } @@ -210,6 +179,97 @@ impl fmt::Debug for SendRequest { } } +enum SendFutState { + Fut { + rx: Receiver>>, + }, + Err { + req: Request, + }, +} + +/// Future returned by [`SendRequest::send_request`], see the method definition +/// for more details. +pub struct SendFut { + state: SendFutState, +} + +impl fmt::Debug for SendFut { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SendFut").finish() + } +} + +// `SendFut` never structurally pins any of its fields +impl Unpin for SendFut {} + +impl Future for SendFut { + type Output = crate::Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = match &mut self.state { + SendFutState::Fut { rx } => match ready!(Pin::new(rx).poll(cx)) { + Ok(Ok(resp)) => Ok(resp), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + }, + SendFutState::Err { req: _req } => { + debug!("connection was not ready"); + Err(crate::Error::new_canceled().with("connection was not ready")) + } + }; + Poll::Ready(res) + } +} + +enum TrySendFutState { + Fut { + rx: Receiver, TrySendError>>>, + }, + Err { + req: Option>, + }, +} + +/// Future returned by [`SendRequest::try_send_request`], see the method +/// definition for more details. +pub struct TrySendFut { + state: TrySendFutState, +} + +impl fmt::Debug for TrySendFut { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TrySendFut").finish() + } +} + +// `TrySendFut` never structurally pins any of its fields +impl Unpin for TrySendFut {} + +impl Future for TrySendFut { + type Output = Result, TrySendError>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = match &mut self.state { + TrySendFutState::Fut { rx } => match ready!(Pin::new(rx).poll(cx)) { + Ok(Ok(resp)) => Ok(resp), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + }, + TrySendFutState::Err { req } => { + debug!("connection was not ready"); + Err(TrySendError { + error: crate::Error::new_canceled().with("connection was not ready"), + message: Some(req.take().expect("future already resolved")), + }) + } + }; + Poll::Ready(res) + } +} + // ===== impl Connection impl Connection From 22a1d3e745ebb6d54560da243f38f2b5ed883e5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20=C5=81abor?= Date: Tue, 9 Jun 2026 12:27:32 -0400 Subject: [PATCH 2/4] refactor(client): remove `Request` from `SendFut` --- src/client/conn/http1.rs | 19 +++++++++++-------- src/client/conn/http2.rs | 18 ++++++++++-------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index 485596d54c..d36d70b927 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -3,6 +3,7 @@ use std::error::Error as StdError; use std::fmt; use std::future::Future; +use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; @@ -214,9 +215,12 @@ where pub fn send_request(&mut self, req: Request) -> SendFut { let state = match self.dispatch.send(req) { Ok(rx) => SendFutState::Fut { rx }, - Err(req) => SendFutState::Err { req }, + Err(_) => SendFutState::Err, }; - SendFut { state } + SendFut { + state, + body: PhantomData, + } } /// Sends a `Request` on the associated connection. @@ -242,19 +246,18 @@ impl fmt::Debug for SendRequest { } } -enum SendFutState { +enum SendFutState { Fut { rx: Receiver>>, }, - Err { - req: Request, - }, + Err, } /// Future returned by [`SendRequest::send_request`], see the method definition /// for more details. pub struct SendFut { - state: SendFutState, + state: SendFutState, + body: PhantomData, } impl fmt::Debug for SendFut { @@ -277,7 +280,7 @@ impl Future for SendFut { // this is definite bug if it happens, but it shouldn't happen! Err(_) => panic!("dispatch dropped without returning error"), }, - SendFutState::Err { req: _req } => { + SendFutState::Err => { debug!("connection was not ready"); Err(crate::Error::new_canceled().with("connection was not ready")) } diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 6db6bdf846..b6c5f772ad 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -151,9 +151,12 @@ where pub fn send_request(&mut self, req: Request) -> SendFut { let state = match self.dispatch.send(req) { Ok(rx) => SendFutState::Fut { rx }, - Err(req) => SendFutState::Err { req }, + Err(_) => SendFutState::Err, }; - SendFut { state } + SendFut { + state, + body: PhantomData, + } } /// Sends a `Request` on the associated connection. @@ -179,19 +182,18 @@ impl fmt::Debug for SendRequest { } } -enum SendFutState { +enum SendFutState { Fut { rx: Receiver>>, }, - Err { - req: Request, - }, + Err, } /// Future returned by [`SendRequest::send_request`], see the method definition /// for more details. pub struct SendFut { - state: SendFutState, + state: SendFutState, + body: PhantomData, } impl fmt::Debug for SendFut { @@ -214,7 +216,7 @@ impl Future for SendFut { // this is definite bug if it happens, but it shouldn't happen! Err(_) => panic!("dispatch dropped without returning error"), }, - SendFutState::Err { req: _req } => { + SendFutState::Err => { debug!("connection was not ready"); Err(crate::Error::new_canceled().with("connection was not ready")) } From 0ad3b8b55cbbb2e2d59581e2d229bff9aa187a7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20=C5=81abor?= Date: Tue, 9 Jun 2026 12:27:50 -0400 Subject: [PATCH 3/4] refactor(client): remove body generic parameter from `SendFut` --- src/client/conn/http1.rs | 18 +++++------------- src/client/conn/http2.rs | 17 +++++------------ 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index d36d70b927..517b6b7543 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -3,7 +3,6 @@ use std::error::Error as StdError; use std::fmt; use std::future::Future; -use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; @@ -212,15 +211,12 @@ where /// hyper closes the underlying connection when a request future is /// dropped before completion. Any subsequent calls on the same /// [`SendRequest`] will return a `canceled` error. - pub fn send_request(&mut self, req: Request) -> SendFut { + pub fn send_request(&mut self, req: Request) -> SendFut { let state = match self.dispatch.send(req) { Ok(rx) => SendFutState::Fut { rx }, Err(_) => SendFutState::Err, }; - SendFut { - state, - body: PhantomData, - } + SendFut { state } } /// Sends a `Request` on the associated connection. @@ -255,21 +251,17 @@ enum SendFutState { /// Future returned by [`SendRequest::send_request`], see the method definition /// for more details. -pub struct SendFut { +pub struct SendFut { state: SendFutState, - body: PhantomData, } -impl fmt::Debug for SendFut { +impl fmt::Debug for SendFut { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SendFut").finish() } } -// `SendFut` never structurally pins any of its fields -impl Unpin for SendFut {} - -impl Future for SendFut { +impl Future for SendFut { type Output = crate::Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index b6c5f772ad..81adaf4c76 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -148,15 +148,12 @@ where /// other in-flight and future requests. The peer is notified /// immediately rather than continuing to send a response body that /// would be discarded. - pub fn send_request(&mut self, req: Request) -> SendFut { + pub fn send_request(&mut self, req: Request) -> SendFut { let state = match self.dispatch.send(req) { Ok(rx) => SendFutState::Fut { rx }, Err(_) => SendFutState::Err, }; - SendFut { - state, - body: PhantomData, - } + SendFut { state } } /// Sends a `Request` on the associated connection. @@ -191,21 +188,17 @@ enum SendFutState { /// Future returned by [`SendRequest::send_request`], see the method definition /// for more details. -pub struct SendFut { +pub struct SendFut { state: SendFutState, - body: PhantomData, } -impl fmt::Debug for SendFut { +impl fmt::Debug for SendFut { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SendFut").finish() } } -// `SendFut` never structurally pins any of its fields -impl Unpin for SendFut {} - -impl Future for SendFut { +impl Future for SendFut { type Output = crate::Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { From 19a084ca82c937dd10c0b97641fedbe897e423e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20=C5=81abor?= Date: Tue, 9 Jun 2026 13:09:51 -0400 Subject: [PATCH 4/4] refactor(client): box error variant in `TrySendFut` --- src/client/conn/http1.rs | 9 ++++----- src/client/conn/http2.rs | 9 ++++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index 517b6b7543..bd3d44a7f8 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -230,7 +230,9 @@ where pub fn try_send_request(&mut self, req: Request) -> TrySendFut { let state = match self.dispatch.try_send(req) { Ok(rx) => TrySendFutState::Fut { rx }, - Err(req) => TrySendFutState::Err { req: Some(req) }, + Err(req) => TrySendFutState::Err { + req: Box::new(Some(req)), + }, }; TrySendFut { state } } @@ -286,7 +288,7 @@ enum TrySendFutState { rx: Receiver, TrySendError>>>, }, Err { - req: Option>, + req: Box>>, }, } @@ -302,9 +304,6 @@ impl fmt::Debug for TrySendFut { } } -// `TrySendFut` never structurally pins any of its fields -impl Unpin for TrySendFut {} - impl Future for TrySendFut { type Output = Result, TrySendError>>; diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 81adaf4c76..bf13b401f6 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -167,7 +167,9 @@ where pub fn try_send_request(&mut self, req: Request) -> TrySendFut { let state = match self.dispatch.try_send(req) { Ok(rx) => TrySendFutState::Fut { rx }, - Err(req) => TrySendFutState::Err { req: Some(req) }, + Err(req) => TrySendFutState::Err { + req: Box::new(Some(req)), + }, }; TrySendFut { state } } @@ -223,7 +225,7 @@ enum TrySendFutState { rx: Receiver, TrySendError>>>, }, Err { - req: Option>, + req: Box>>, }, } @@ -239,9 +241,6 @@ impl fmt::Debug for TrySendFut { } } -// `TrySendFut` never structurally pins any of its fields -impl Unpin for TrySendFut {} - impl Future for TrySendFut { type Output = Result, TrySendError>>;