Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ autopilot = { path = "crates/autopilot" }
aws-config = "1.5.1"
aws-sdk-s3 = { version = "1.34.0", default-features = false }
bytes-hex = { path = "crates/bytes-hex" }
bytes = "1.11.1"
chain = { path = "crates/chain" }
console-subscriber = "0.3.0"
const_format = "0.2.32"
Expand Down
3 changes: 2 additions & 1 deletion crates/autopilot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ alloy = { workspace = true, features = ["rand", "provider-debug-api", "provider-
app-data = { workspace = true }
axum = { workspace = true }
bytes-hex = { workspace = true } # may get marked as unused but it's used with serde
bytes = { workspace = true }
anyhow = { workspace = true }
async-trait = { workspace = true }
bigdecimal = { workspace = true }
Expand Down Expand Up @@ -47,7 +48,7 @@ order-validation = { workspace = true }
prometheus = { workspace = true }
prometheus-metric-storage = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true, features = ["gzip", "json"] }
reqwest = { workspace = true, features = ["gzip", "json", "stream"] }
rust_decimal = { workspace = true }
s3 = { workspace = true }
serde = { workspace = true }
Expand Down
57 changes: 44 additions & 13 deletions crates/autopilot/src/infra/solvers/dto/solve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@ use {
crate::{
boundary,
domain::{self, eth},
infra::persistence::dto::{self, order::Order},
infra::{
persistence::dto::{self, order::Order},
solvers::InjectIntoHttpRequest,
},
},
alloy::primitives::{Address, U256},
bytes::Bytes,
chrono::{DateTime, Utc},
itertools::Itertools,
number::serialization::HexOrDecimalU256,
reqwest::RequestBuilder,
serde::{Deserialize, Serialize},
serde_with::{DisplayFromStr, serde_as},
std::{
borrow::Cow,
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
},
};
Expand All @@ -21,17 +26,14 @@ use {
/// request. The purpose of this is to make it ergonomic
/// to serialize a request once and reuse the resulting
/// string in multiple HTTP requests.
#[derive(Clone, Debug, Serialize, derive_more::Display)]
pub struct Request(Arc<serde_json::value::RawValue>);

impl Request {
pub fn as_str(&self) -> &str {
self.0.get()
}
#[derive(Clone, Debug)]
pub struct Request {
auction_id: i64,
body: bytes::Bytes,
}

impl Request {
pub fn new(
pub async fn new(
auction: &domain::Auction,
trusted_tokens: &HashSet<Address>,
time_limit: Duration,
Expand Down Expand Up @@ -64,9 +66,38 @@ impl Request {
deadline: Utc::now() + chrono::Duration::from_std(time_limit).unwrap(),
surplus_capturing_jit_order_owners: auction.surplus_capturing_jit_order_owners.to_vec(),
};
Self(Arc::from(serde_json::value::to_raw_value(&helper).expect(
"only fails with non-string keys which we do not have",
)))
let auction_id = auction.id;

let body = tokio::task::spawn_blocking(move || {
let serialized = serde_json::to_vec(&helper).expect("type should be JSON serializable");
Bytes::from(serialized)
})
.await
.expect("inner task should not panic as serialization should work for the given type");

Self { body, auction_id }
}
}

impl InjectIntoHttpRequest for Request {
fn inject(&self, request: RequestBuilder) -> RequestBuilder {
request
.body(self.body.clone())
// announce which auction this request is for in the
// headers to help the driver detect duplicated
// `/solve` requests before streaming the body
.header("X-Auction-Id", self.auction_id)
// manually set the content type header for JSON since
// we can't use `request.json(self)`
.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static("application/json")
)
}

fn body_to_string(&self) -> Cow<'_, str> {
let string = str::from_utf8(self.body.as_ref()).unwrap();
Cow::Borrowed(string)
}
}

Expand Down
45 changes: 29 additions & 16 deletions crates/autopilot/src/infra/solvers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use {
alloy::signers::{Signer, aws::AwsSigner},
anyhow::{Context, Result, anyhow},
observe::tracing::tracing_headers,
reqwest::{Client, StatusCode},
std::time::Duration,
reqwest::{Client, RequestBuilder, StatusCode},
std::{borrow::Cow, time::Duration},
thiserror::Error,
tracing::instrument,
url::Url,
Expand Down Expand Up @@ -122,28 +122,22 @@ impl Driver {
async fn request_response<Response, Request>(
&self,
path: &str,
request: Request,
payload: Request,
) -> Result<Response>
where
Response: serde::de::DeserializeOwned,
Request: serde::Serialize + Send + Sync + 'static,
Request: InjectIntoHttpRequest,
{
let url = util::join(&self.url, path);

tracing::trace!(
path=&url.path(),
body=%serde_json::to_string_pretty(&request).unwrap(),
path = &url.path(),
body = %payload.body_to_string(),
"solver request",
);
let mut request = {
let builder = self.client.post(url.clone()).headers(tracing_headers());
// If the payload is very big then serializing it will block the
// executor a long time (mostly relevant for solve requests).
// That's why we always do it on a thread specifically for
// running blocking tasks.
tokio::task::spawn_blocking(move || builder.json(&request))
.await
.context("failed to build request")?
};

let request = self.client.post(url.clone()).headers(tracing_headers());
let mut request = payload.inject(request);

if let Some(request_id) = observe::distributed_tracing::request_id::from_current_span() {
request = request.header("X-REQUEST-ID", request_id);
Expand Down Expand Up @@ -181,3 +175,22 @@ pub async fn response_body_with_size_limit(
}
Ok(bytes)
}

trait InjectIntoHttpRequest {
fn inject(&self, request: RequestBuilder) -> RequestBuilder;
fn body_to_string(&self) -> Cow<'_, str>;
}

impl<T> InjectIntoHttpRequest for T
where
T: serde::ser::Serialize + Sized,
{
fn inject(&self, request: RequestBuilder) -> RequestBuilder {
request.json(&self)
}

fn body_to_string(&self) -> Cow<'_, str> {
let serialized = serde_json::to_string(&self).expect("type should be JSON serializable");
Cow::Owned(serialized)
}
Comment on lines +192 to +195
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using .unwrap() on the result of serde_json::to_string can cause a panic if serialization fails. In a tracing context, this can crash the service, creating a potential denial-of-service vulnerability. It is safer to handle this error gracefully, as shown in the suggestion. Furthermore, serializing data to JSON and then immediately parsing it back into a serde_json::Value within the same operation is inefficient. For tracing, consider using a tracing::field::Visit implementation to directly collect fields into a serde_json::Map to avoid unnecessary serialization/deserialization cycles, which is a more robust and efficient approach for this context.

    fn body_to_string(&self) -> Cow<'_, str> {
        match serde_json::to_string(self) {
            Ok(serialized) => Cow::Owned(serialized),
            Err(_) => Cow::Borrowed("<failed to serialize body>"),
        }
    }
References
  1. Avoid serializing data to JSON and then immediately parsing it back into a serde_json::Value within the same operation, as this is inefficient and can impact performance. Instead, consider using a tracing::field::Visit implementation to directly collect fields into a serde_json::Map to avoid unnecessary serialization/deserialization cycles.

}
3 changes: 2 additions & 1 deletion crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,8 @@ impl RunLoop {
auction,
&self.trusted_tokens.all(),
self.config.solve_deadline,
);
)
.await;

let mut bids = futures::future::join_all(
self.drivers
Expand Down
3 changes: 2 additions & 1 deletion crates/autopilot/src/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ impl RunLoop {
/// Runs the solver competition, making all configured drivers participate.
#[instrument(skip_all)]
async fn competition(&self, auction: &domain::Auction) -> Vec<Bid<Unscored>> {
let request = solve::Request::new(auction, &self.trusted_tokens.all(), self.solve_deadline);
let request =
solve::Request::new(auction, &self.trusted_tokens.all(), self.solve_deadline).await;

futures::future::join_all(
self.drivers
Expand Down
Loading