diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index b78757b9ca..bd3d44a7f8 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -11,6 +11,7 @@ use bytes::Bytes; use futures_core::ready; use http::{Request, Response}; use httparse::ParserConfig; +use tokio::sync::oneshot::Receiver; use super::super::dispatch::{self, TrySendError}; use crate::body::{Body, Incoming as IncomingBody}; @@ -210,26 +211,12 @@ where /// hyper closes the underlying connection when a request future is /// dropped before completion. Any subsequent calls on the same /// [`SendRequest`] will return a `canceled` error. - pub fn send_request( - &mut self, - req: Request, - ) -> impl Future>> { - let sent = self.dispatch.send(req); - - async move { - match sent { - Ok(rx) => match rx.await { - Ok(Ok(resp)) => Ok(resp), - Ok(Err(err)) => Err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_canceled) => panic!("dispatch dropped without returning error"), - }, - Err(_req) => { - debug!("connection was not ready"); - Err(crate::Error::new_canceled().with("connection was not ready")) - } - } - } + pub fn send_request(&mut self, req: Request) -> SendFut { + let state = match self.dispatch.send(req) { + Ok(rx) => SendFutState::Fut { rx }, + Err(_) => SendFutState::Err, + }; + SendFut { state } } /// Sends a `Request` on the associated connection. @@ -240,29 +227,14 @@ where /// /// If there was an error before trying to serialize the request to the /// connection, the message will be returned as part of this error. - pub fn try_send_request( - &mut self, - req: Request, - ) -> impl Future, TrySendError>>> { - let sent = self.dispatch.try_send(req); - async move { - match sent { - Ok(rx) => match rx.await { - Ok(Ok(res)) => Ok(res), - Ok(Err(err)) => Err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_) => panic!("dispatch dropped without returning error"), - }, - Err(req) => { - debug!("connection was not ready"); - let error = crate::Error::new_canceled().with("connection was not ready"); - Err(TrySendError { - error, - message: Some(req), - }) - } - } - } + pub fn try_send_request(&mut self, req: Request) -> TrySendFut { + let state = match self.dispatch.try_send(req) { + Ok(rx) => TrySendFutState::Fut { rx }, + Err(req) => TrySendFutState::Err { + req: Box::new(Some(req)), + }, + }; + TrySendFut { state } } } @@ -272,6 +244,89 @@ impl fmt::Debug for SendRequest { } } +enum SendFutState { + Fut { + rx: Receiver>>, + }, + Err, +} + +/// Future returned by [`SendRequest::send_request`], see the method definition +/// for more details. +pub struct SendFut { + state: SendFutState, +} + +impl fmt::Debug for SendFut { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SendFut").finish() + } +} + +impl Future for SendFut { + type Output = crate::Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = match &mut self.state { + SendFutState::Fut { rx } => match ready!(Pin::new(rx).poll(cx)) { + Ok(Ok(resp)) => Ok(resp), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + }, + SendFutState::Err => { + debug!("connection was not ready"); + Err(crate::Error::new_canceled().with("connection was not ready")) + } + }; + Poll::Ready(res) + } +} + +enum TrySendFutState { + Fut { + rx: Receiver, TrySendError>>>, + }, + Err { + req: Box>>, + }, +} + +/// Future returned by [`SendRequest::try_send_request`], see the method +/// definition for more details. +pub struct TrySendFut { + state: TrySendFutState, +} + +impl fmt::Debug for TrySendFut { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TrySendFut").finish() + } +} + +impl Future for TrySendFut { + type Output = Result, TrySendError>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = match &mut self.state { + TrySendFutState::Fut { rx } => match ready!(Pin::new(rx).poll(cx)) { + Ok(Ok(resp)) => Ok(resp), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + }, + TrySendFutState::Err { req } => { + debug!("connection was not ready"); + Err(TrySendError { + error: crate::Error::new_canceled().with("connection was not ready"), + message: Some(req.take().expect("future already resolved")), + }) + } + }; + Poll::Ready(res) + } +} + // ===== impl Connection impl Connection diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 5ffad524be..bf13b401f6 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -12,6 +12,7 @@ use std::time::Duration; use crate::rt::{Read, Write}; use futures_core::ready; use http::{Request, Response}; +use tokio::sync::oneshot::Receiver; use super::super::dispatch::{self, TrySendError}; use crate::body::{Body, Incoming as IncomingBody}; @@ -147,27 +148,12 @@ where /// other in-flight and future requests. The peer is notified /// immediately rather than continuing to send a response body that /// would be discarded. - pub fn send_request( - &mut self, - req: Request, - ) -> impl Future>> { - let sent = self.dispatch.send(req); - - async move { - match sent { - Ok(rx) => match rx.await { - Ok(Ok(resp)) => Ok(resp), - Ok(Err(err)) => Err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_canceled) => panic!("dispatch dropped without returning error"), - }, - Err(_req) => { - debug!("connection was not ready"); - - Err(crate::Error::new_canceled().with("connection was not ready")) - } - } - } + pub fn send_request(&mut self, req: Request) -> SendFut { + let state = match self.dispatch.send(req) { + Ok(rx) => SendFutState::Fut { rx }, + Err(_) => SendFutState::Err, + }; + SendFut { state } } /// Sends a `Request` on the associated connection. @@ -178,29 +164,14 @@ where /// /// If there was an error before trying to serialize the request to the /// connection, the message will be returned as part of this error. - pub fn try_send_request( - &mut self, - req: Request, - ) -> impl Future, TrySendError>>> { - let sent = self.dispatch.try_send(req); - async move { - match sent { - Ok(rx) => match rx.await { - Ok(Ok(res)) => Ok(res), - Ok(Err(err)) => Err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_) => panic!("dispatch dropped without returning error"), - }, - Err(req) => { - debug!("connection was not ready"); - let error = crate::Error::new_canceled().with("connection was not ready"); - Err(TrySendError { - error, - message: Some(req), - }) - } - } - } + pub fn try_send_request(&mut self, req: Request) -> TrySendFut { + let state = match self.dispatch.try_send(req) { + Ok(rx) => TrySendFutState::Fut { rx }, + Err(req) => TrySendFutState::Err { + req: Box::new(Some(req)), + }, + }; + TrySendFut { state } } } @@ -210,6 +181,89 @@ impl fmt::Debug for SendRequest { } } +enum SendFutState { + Fut { + rx: Receiver>>, + }, + Err, +} + +/// Future returned by [`SendRequest::send_request`], see the method definition +/// for more details. +pub struct SendFut { + state: SendFutState, +} + +impl fmt::Debug for SendFut { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SendFut").finish() + } +} + +impl Future for SendFut { + type Output = crate::Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = match &mut self.state { + SendFutState::Fut { rx } => match ready!(Pin::new(rx).poll(cx)) { + Ok(Ok(resp)) => Ok(resp), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + }, + SendFutState::Err => { + debug!("connection was not ready"); + Err(crate::Error::new_canceled().with("connection was not ready")) + } + }; + Poll::Ready(res) + } +} + +enum TrySendFutState { + Fut { + rx: Receiver, TrySendError>>>, + }, + Err { + req: Box>>, + }, +} + +/// Future returned by [`SendRequest::try_send_request`], see the method +/// definition for more details. +pub struct TrySendFut { + state: TrySendFutState, +} + +impl fmt::Debug for TrySendFut { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TrySendFut").finish() + } +} + +impl Future for TrySendFut { + type Output = Result, TrySendError>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = match &mut self.state { + TrySendFutState::Fut { rx } => match ready!(Pin::new(rx).poll(cx)) { + Ok(Ok(resp)) => Ok(resp), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + }, + TrySendFutState::Err { req } => { + debug!("connection was not ready"); + Err(TrySendError { + error: crate::Error::new_canceled().with("connection was not ready"), + message: Some(req.take().expect("future already resolved")), + }) + } + }; + Poll::Ready(res) + } +} + // ===== impl Connection impl Connection