Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 98 additions & 43 deletions src/client/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use bytes::Bytes;
use futures_core::ready;
use http::{Request, Response};
use httparse::ParserConfig;
use tokio::sync::oneshot::Receiver;

use super::super::dispatch::{self, TrySendError};
use crate::body::{Body, Incoming as IncomingBody};
Expand Down Expand Up @@ -210,26 +211,12 @@ where
/// hyper closes the underlying connection when a request future is
/// dropped before completion. Any subsequent calls on the same
/// [`SendRequest`] will return a `canceled` error.
pub fn send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
let sent = self.dispatch.send(req);

async move {
match sent {
Ok(rx) => match rx.await {
Ok(Ok(resp)) => Ok(resp),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_canceled) => panic!("dispatch dropped without returning error"),
},
Err(_req) => {
debug!("connection was not ready");
Err(crate::Error::new_canceled().with("connection was not ready"))
}
}
}
pub fn send_request(&mut self, req: Request<B>) -> SendFut {
let state = match self.dispatch.send(req) {
Ok(rx) => SendFutState::Fut { rx },
Err(_) => SendFutState::Err,
};
SendFut { state }
}

/// Sends a `Request` on the associated connection.
Expand All @@ -240,29 +227,14 @@ where
///
/// If there was an error before trying to serialize the request to the
/// connection, the message will be returned as part of this error.
pub fn try_send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
let sent = self.dispatch.try_send(req);
async move {
match sent {
Ok(rx) => match rx.await {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
},
Err(req) => {
debug!("connection was not ready");
let error = crate::Error::new_canceled().with("connection was not ready");
Err(TrySendError {
error,
message: Some(req),
})
}
}
}
pub fn try_send_request(&mut self, req: Request<B>) -> TrySendFut<B> {
let state = match self.dispatch.try_send(req) {
Ok(rx) => TrySendFutState::Fut { rx },
Err(req) => TrySendFutState::Err {
req: Box::new(Some(req)),
},
};
TrySendFut { state }
}
}

Expand All @@ -272,6 +244,89 @@ impl<B> fmt::Debug for SendRequest<B> {
}
}

enum SendFutState {
Fut {
rx: Receiver<crate::Result<Response<IncomingBody>>>,
},
Err,
}

/// Future returned by [`SendRequest::send_request`], see the method definition
/// for more details.
pub struct SendFut {
state: SendFutState,
}

impl fmt::Debug for SendFut {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendFut").finish()
}
}

impl Future for SendFut {
type Output = crate::Result<Response<IncomingBody>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = match &mut self.state {
SendFutState::Fut { rx } => match ready!(Pin::new(rx).poll(cx)) {
Ok(Ok(resp)) => Ok(resp),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
},
SendFutState::Err => {
debug!("connection was not ready");
Err(crate::Error::new_canceled().with("connection was not ready"))
}
};
Poll::Ready(res)
}
}

enum TrySendFutState<B> {
Fut {
rx: Receiver<Result<Response<IncomingBody>, TrySendError<Request<B>>>>,
},
Err {
req: Box<Option<Request<B>>>,
},
}

/// Future returned by [`SendRequest::try_send_request`], see the method
/// definition for more details.
pub struct TrySendFut<B> {
state: TrySendFutState<B>,
}

impl<B> fmt::Debug for TrySendFut<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TrySendFut").finish()
}
}

impl<B> Future for TrySendFut<B> {
type Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = match &mut self.state {
TrySendFutState::Fut { rx } => match ready!(Pin::new(rx).poll(cx)) {
Ok(Ok(resp)) => Ok(resp),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
},
TrySendFutState::Err { req } => {
debug!("connection was not ready");
Err(TrySendError {
error: crate::Error::new_canceled().with("connection was not ready"),
message: Some(req.take().expect("future already resolved")),
})
}
};
Poll::Ready(res)
}
}

// ===== impl Connection

impl<T, B> Connection<T, B>
Expand Down
142 changes: 98 additions & 44 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::time::Duration;
use crate::rt::{Read, Write};
use futures_core::ready;
use http::{Request, Response};
use tokio::sync::oneshot::Receiver;

use super::super::dispatch::{self, TrySendError};
use crate::body::{Body, Incoming as IncomingBody};
Expand Down Expand Up @@ -147,27 +148,12 @@ where
/// other in-flight and future requests. The peer is notified
/// immediately rather than continuing to send a response body that
/// would be discarded.
pub fn send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
let sent = self.dispatch.send(req);

async move {
match sent {
Ok(rx) => match rx.await {
Ok(Ok(resp)) => Ok(resp),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_canceled) => panic!("dispatch dropped without returning error"),
},
Err(_req) => {
debug!("connection was not ready");

Err(crate::Error::new_canceled().with("connection was not ready"))
}
}
}
pub fn send_request(&mut self, req: Request<B>) -> SendFut {
let state = match self.dispatch.send(req) {
Ok(rx) => SendFutState::Fut { rx },
Err(_) => SendFutState::Err,
};
SendFut { state }
}

/// Sends a `Request` on the associated connection.
Expand All @@ -178,29 +164,14 @@ where
///
/// If there was an error before trying to serialize the request to the
/// connection, the message will be returned as part of this error.
pub fn try_send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
let sent = self.dispatch.try_send(req);
async move {
match sent {
Ok(rx) => match rx.await {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
},
Err(req) => {
debug!("connection was not ready");
let error = crate::Error::new_canceled().with("connection was not ready");
Err(TrySendError {
error,
message: Some(req),
})
}
}
}
pub fn try_send_request(&mut self, req: Request<B>) -> TrySendFut<B> {
let state = match self.dispatch.try_send(req) {
Ok(rx) => TrySendFutState::Fut { rx },
Err(req) => TrySendFutState::Err {
req: Box::new(Some(req)),
},
};
TrySendFut { state }
}
}

Expand All @@ -210,6 +181,89 @@ impl<B> fmt::Debug for SendRequest<B> {
}
}

enum SendFutState {
Fut {
rx: Receiver<crate::Result<Response<IncomingBody>>>,
},
Err,
}

/// Future returned by [`SendRequest::send_request`], see the method definition
/// for more details.
pub struct SendFut {
state: SendFutState,
}

impl fmt::Debug for SendFut {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendFut").finish()
}
}

impl Future for SendFut {
type Output = crate::Result<Response<IncomingBody>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = match &mut self.state {
SendFutState::Fut { rx } => match ready!(Pin::new(rx).poll(cx)) {
Ok(Ok(resp)) => Ok(resp),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
},
SendFutState::Err => {
debug!("connection was not ready");
Err(crate::Error::new_canceled().with("connection was not ready"))
}
};
Poll::Ready(res)
}
}

enum TrySendFutState<B> {
Fut {
rx: Receiver<Result<Response<IncomingBody>, TrySendError<Request<B>>>>,
},
Err {
req: Box<Option<Request<B>>>,
},
}

/// Future returned by [`SendRequest::try_send_request`], see the method
/// definition for more details.
pub struct TrySendFut<B> {
state: TrySendFutState<B>,
}

impl<B> fmt::Debug for TrySendFut<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TrySendFut").finish()
}
}

impl<B> Future for TrySendFut<B> {
type Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = match &mut self.state {
TrySendFutState::Fut { rx } => match ready!(Pin::new(rx).poll(cx)) {
Ok(Ok(resp)) => Ok(resp),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
},
TrySendFutState::Err { req } => {
debug!("connection was not ready");
Err(TrySendError {
error: crate::Error::new_canceled().with("connection was not ready"),
message: Some(req.take().expect("future already resolved")),
})
}
};
Poll::Ready(res)
}
}

// ===== impl Connection

impl<T, B, E> Connection<T, B, E>
Expand Down
Loading