-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.rs
More file actions
110 lines (100 loc) · 3.59 KB
/
client.rs
File metadata and controls
110 lines (100 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
pub(crate) mod engines;
use crate::{Receive, Transmit};
use tower_util::ServiceExt;
/// RPC Client. `Engine` is something that can actually send requests to the
/// RPC server, e.g. hyper::Client or reqwest::Client.
#[derive(Clone)]
pub struct Client<Engine = engines::ReqwestEngine> {
engine: Engine,
base: String,
}
impl<E> Client<E> {
/// Constructs new client from the given engine and base url.
/// Base usually should not end with '/'.
/// All requests will be sent to "{self.base}{R::ENDPOINT}".
pub fn new(engine: E, base: String) -> Self {
Client { engine, base }
}
}
#[derive(Debug)]
pub enum CallError<TransportError, RecvBatchError, SendBatchError> {
Transport(TransportError),
Recv(RecvBatchError),
Send(SendBatchError),
}
impl<TE, RE, SE> CallError<TE, RE, SE> {
pub fn description(&self) -> &'static str {
match self {
CallError::Transport(_) => "transport error",
CallError::Send(_) => "failed to send batch",
CallError::Recv(_) => "failed to receive batch",
}
}
}
impl<TE, RE, SE> std::fmt::Display for CallError<TE, RE, SE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.description().fmt(f)
}
}
impl<
TE: std::error::Error + 'static,
RE: std::error::Error + 'static,
SE: std::error::Error + 'static,
> std::error::Error for CallError<TE, RE, SE>
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
CallError::Transport(inner) => Some(inner),
CallError::Recv(inner) => Some(inner),
CallError::Send(inner) => Some(inner),
}
}
}
impl<E> Client<E>
where
E: hyper::service::Service<
hyper::Request<hyper::Body>,
Response = hyper::Response<hyper::Body>,
>,
{
/// Executes RPC call, writing all Request-sided messages upfront and
/// returning batch of server responses.
pub async fn call<R: crate::Route>(
&mut self,
data: <<R::Request as crate::Direction>::Tx as crate::Transmit>::BatchData,
) -> Result<
<<R::Response as crate::Direction>::Rx as crate::Receive>::BatchData,
CallError<
<E as hyper::service::Service<hyper::Request<hyper::Body>>>::Error,
<<R::Response as crate::Direction>::Rx as crate::Receive>::BatchError,
<<R::Request as crate::Direction>::Tx as crate::Transmit>::BatchError,
>,
> {
let (tx, rx) = self.start::<R>().await.map_err(CallError::Transport)?;
tx.send_batch(data).await.map_err(CallError::Send)?;
rx.recv_batch().await.map_err(CallError::Recv)
}
/// Starts new RPC call.
/// Returns transmitter that can send messages to server,
/// and receiver that can receive messages from server.
pub async fn start<R: crate::Route>(
&mut self,
) -> Result<
(
<R::Request as crate::Direction>::Tx,
<R::Response as crate::Direction>::Rx,
),
<E as hyper::service::Service<hyper::Request<hyper::Body>>>::Error,
> {
let (body_sender, body) = hyper::Body::channel();
let req = hyper::Request::builder()
.method(hyper::Method::POST)
.uri(format!("{}{}", self.base, R::ENDPOINT))
.body(body)
.expect("invalid data");
let tx = <R::Request as crate::Direction>::Tx::from_body_sender(body_sender);
let response = (&mut self.engine).oneshot(req).await?;
let rx = <R::Response as crate::Direction>::Rx::from_body(response.into_body());
Ok((tx, rx))
}
}